Fix sub2eth synchronization (#172)

* ease serde version requirements (to build OE with builtin)

* trace + fix completion notifications

* check incompletion on submit

* fix compilation

* do not ask for synced blocks when queue is empty

* cargo fmt --all

* Update relays/ethereum/src/ethereum_client.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* remove closure

* fn submit_substrate_header() -> Option<RpcError>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-07-06 10:49:31 +03:00
committed by Bastian Köcher
parent b5bdbeb518
commit 456515e08b
12 changed files with 418 additions and 65 deletions
+1 -1
View File
@@ -24,7 +24,7 @@ num-traits = "0.2"
parity-crypto = { version = "0.6", features = ["publickey"] }
parking_lot = "0.11.0"
rustc-hex = "2.0.1"
serde = { version = "1.0.114", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.55"
sp-bridge-eth-poa = { path = "../../primitives/ethereum-poa" }
time = "0.2"
@@ -96,6 +96,25 @@
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes",
"name": "rawHeader",
"type": "bytes"
}
],
"name": "isIncompleteHeader",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
File diff suppressed because one or more lines are too long
@@ -1,5 +1,5 @@
Last Change Date: 2020-05-01
Last Change Date: 2020-07-03
Solc version: 0.6.6+commit.6c089d02
Source hash (keccak256): 0x36403636ad41082ca6c937c60ab06446cd9ef7036c178fa2f04d7c8286544d39
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/8b54f5f648f8685fecd52b7af1deb277922b0fc3/substrate-bridge.sol
Source hash (keccak256): 0x3e6339beefe6786f4f26b408d4f727e03c6fd9630d692af9a7f6b46143fa308f
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/1d0fa475a2ba3a70a47ed2dd870568c42ec16c8c/substrate-bridge.sol
Compiler flags used (command to produce the file): `docker run -i ethereum/solc:0.6.6 --optimize --bin - < substrate-bridge.sol`
+231 -20
View File
@@ -20,7 +20,7 @@ use crate::ethereum_types::{
use crate::rpc::{Ethereum, EthereumRpc};
use crate::rpc_errors::{EthereumNodeError, RpcError};
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::sync_types::HeaderId;
use crate::sync_types::{HeaderId, MaybeConnectionError, SubmittedHeaders};
use async_trait::async_trait;
use codec::{Decode, Encode};
@@ -30,7 +30,7 @@ use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use parity_crypto::publickey::KeyPair;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
// to encode/decode contract calls
ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json");
@@ -170,7 +170,7 @@ pub trait EthereumHighLevelRpc: EthereumRpc {
params: EthereumSigningParams,
contract_address: Address,
headers: Vec<QueuedSubstrateHeader>,
) -> Result<Vec<SubstrateHeaderId>>;
) -> SubmittedHeaders<SubstrateHeaderId, RpcError>;
/// Returns ids of incomplete Substrate headers.
async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>>;
@@ -246,25 +246,35 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
params: EthereumSigningParams,
contract_address: Address,
headers: Vec<QueuedSubstrateHeader>,
) -> Result<Vec<SubstrateHeaderId>> {
) -> SubmittedHeaders<SubstrateHeaderId, RpcError> {
// read nonce of signer
let address: Address = params.signer.address().as_fixed_bytes().into();
let mut nonce = self.account_nonce(address).await?;
let nonce = match self.account_nonce(address).await {
Ok(nonce) => nonce,
Err(error) => {
return SubmittedHeaders {
submitted: Vec::new(),
incomplete: Vec::new(),
rejected: headers.iter().rev().map(|header| header.id()).collect(),
fatal_error: Some(error),
}
}
};
let ids = headers.iter().map(|header| header.id()).collect();
for header in headers {
self.submit_ethereum_transaction(
&params,
Some(contract_address),
Some(nonce),
false,
bridge_contract::functions::import_header::encode_input(header.header().encode()),
)
.await?;
nonce += 1.into();
}
Ok(ids)
// submit headers. Note that we're cloning self here. It is ok, because
// cloning `jsonrpsee::Client` only clones reference to background threads
submit_substrate_headers(
EthereumHeadersSubmitter {
client: EthereumRpcClient {
client: self.client.clone(),
},
params,
contract_address,
nonce,
},
headers,
)
.await
}
async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>> {
@@ -363,3 +373,204 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
Ok((id, transaction_receipts))
}
}
/// Substrate headers submitter API.
#[async_trait]
trait HeadersSubmitter {
/// Returns Ok(true) if not-yet-imported header is incomplete.
/// Returns Ok(false) if not-yet-imported header is complete.
///
/// Returns Err(()) if contract has rejected header. This probably means
/// that the header is already imported by the contract.
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool>;
/// Submit given header to Ethereum node.
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()>;
}
/// Implementation of Substrate headers submitter that sends headers to running Ethereum node.
struct EthereumHeadersSubmitter {
client: EthereumRpcClient,
params: EthereumSigningParams,
contract_address: Address,
nonce: U256,
}
#[async_trait]
impl HeadersSubmitter for EthereumHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
let (encoded_call, call_decoder) =
bridge_contract::functions::is_incomplete_header::call(header.header().encode());
let call_request = CallRequest {
to: Some(self.contract_address),
data: Some(encoded_call.into()),
..Default::default()
};
let call_result = self.client.eth_call(call_request).await?;
let is_incomplete = call_decoder.decode(&call_result.0)?;
Ok(is_incomplete)
}
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
let result = self
.client
.submit_ethereum_transaction(
&self.params,
Some(self.contract_address),
Some(self.nonce),
false,
bridge_contract::functions::import_header::encode_input(header.header().encode()),
)
.await;
if result.is_ok() {
self.nonce += U256::one();
}
result
}
}
/// Submit multiple Substrate headers.
async fn submit_substrate_headers(
mut header_submitter: impl HeadersSubmitter,
headers: Vec<QueuedSubstrateHeader>,
) -> SubmittedHeaders<SubstrateHeaderId, RpcError> {
let mut ids = headers.iter().map(|header| header.id()).collect::<VecDeque<_>>();
let mut submitted_headers = SubmittedHeaders::default();
for header in headers {
let id = ids.pop_front().expect("both collections have same size; qed");
submitted_headers.fatal_error =
submit_substrate_header(&mut header_submitter, &mut submitted_headers, id, header).await;
if submitted_headers.fatal_error.is_some() {
submitted_headers.rejected.extend(ids);
break;
}
}
submitted_headers
}
/// Submit single Substrate header.
async fn submit_substrate_header(
header_submitter: &mut impl HeadersSubmitter,
submitted_headers: &mut SubmittedHeaders<SubstrateHeaderId, RpcError>,
id: SubstrateHeaderId,
header: QueuedSubstrateHeader,
) -> Option<RpcError> {
// if parent of this header is either incomplete, or rejected, we assume that contract
// will reject this header as well
let parent_id = header.parent_id();
if submitted_headers.rejected.contains(&parent_id) || submitted_headers.incomplete.contains(&parent_id) {
submitted_headers.rejected.push(id);
return None;
}
// check if this header is incomplete
let is_header_incomplete = match header_submitter.is_header_incomplete(&header).await {
Ok(true) => true,
Ok(false) => false,
Err(error) => {
// contract has rejected this header => we do not want to submit it
submitted_headers.rejected.push(id);
if error.is_connection_error() {
return Some(error);
} else {
return None;
}
}
};
// submit header and update submitted headers
match header_submitter.submit_header(header).await {
Ok(_) => {
submitted_headers.submitted.push(id);
if is_header_incomplete {
submitted_headers.incomplete.push(id);
}
None
}
Err(error) => {
submitted_headers.rejected.push(id);
Some(error)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::substrate_types::{Header as SubstrateHeader, Number as SubstrateBlockNumber};
use sp_runtime::traits::Header;
struct TestHeadersSubmitter {
incomplete: Vec<SubstrateHeaderId>,
failed: Vec<SubstrateHeaderId>,
}
#[async_trait]
impl HeadersSubmitter for TestHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
if self.incomplete.iter().any(|i| i.0 == header.id().0) {
Ok(true)
} else {
Ok(false)
}
}
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
if self.failed.iter().any(|i| i.0 == header.id().0) {
Err(RpcError::Ethereum(EthereumNodeError::InvalidSubstrateBlockNumber))
} else {
Ok(())
}
}
}
fn header(number: SubstrateBlockNumber) -> QueuedSubstrateHeader {
QueuedSubstrateHeader::new(SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
))
}
#[test]
fn descendants_of_incomplete_headers_are_not_submitted() {
let submitted_headers = async_std::task::block_on(submit_substrate_headers(
TestHeadersSubmitter {
incomplete: vec![header(5).id()],
failed: vec![],
},
vec![header(5), header(6)],
));
assert_eq!(submitted_headers.submitted, vec![header(5).id()]);
assert_eq!(submitted_headers.incomplete, vec![header(5).id()]);
assert_eq!(submitted_headers.rejected, vec![header(6).id()]);
assert!(submitted_headers.fatal_error.is_none());
}
#[test]
fn headers_after_fatal_error_are_not_submitted() {
let submitted_headers = async_std::task::block_on(submit_substrate_headers(
TestHeadersSubmitter {
incomplete: vec![],
failed: vec![header(6).id()],
},
vec![header(5), header(6), header(7)],
));
assert_eq!(submitted_headers.submitted, vec![header(5).id()]);
assert_eq!(submitted_headers.incomplete, vec![]);
assert_eq!(submitted_headers.rejected, vec![header(6).id(), header(7).id()]);
assert!(submitted_headers.fatal_error.is_some());
}
}
@@ -26,7 +26,7 @@ use crate::substrate_client::{
use crate::substrate_types::into_substrate_ethereum_header;
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::SourceHeader;
use crate::sync_types::{SourceHeader, SubmittedHeaders};
use async_trait::async_trait;
use web3::types::H256;
@@ -155,7 +155,10 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
Ok((id, self.client.ethereum_header_known(id).await?))
}
async fn submit_headers(&self, headers: Vec<QueuedEthereumHeader>) -> Result<Vec<EthereumHeaderId>, Self::Error> {
async fn submit_headers(
&self,
headers: Vec<QueuedEthereumHeader>,
) -> SubmittedHeaders<EthereumHeaderId, Self::Error> {
let (sign_params, sign_transactions) = (self.sign_params.clone(), self.sign_transactions.clone());
self.client
.submit_ethereum_headers(sign_params, headers, sign_transactions)
+43 -12
View File
@@ -62,6 +62,8 @@ pub struct QueuedHeaders<P: HeadersSyncPipeline> {
incomplete_headers: LinkedHashMap<HeaderId<P::Hash, P::Number>, Option<Instant>>,
/// Headers that are waiting to be completed at target node. Auto-sorted by insertion time.
completion_data: LinkedHashMap<HeaderId<P::Hash, P::Number>, P::Completion>,
/// Best synced block number.
best_synced_number: P::Number,
/// Pruned blocks border. We do not store or accept any blocks with number less than
/// this number.
prune_border: P::Number,
@@ -90,6 +92,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
known_headers: KnownHeaders::<P>::new(),
incomplete_headers: LinkedHashMap::new(),
completion_data: LinkedHashMap::new(),
best_synced_number: Zero::zero(),
prune_border: Zero::zero(),
}
}
@@ -158,6 +161,12 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
)
}
/// Returns number of best synced block we have ever seen. It is either less
/// than `best_queued_number()`, or points to last synced block if queue is empty.
pub fn best_synced_number(&self) -> P::Number {
self.best_synced_number
}
/// Returns synchronization status of the header.
pub fn status(&self, id: &HeaderId<P::Hash, P::Number>) -> HeaderStatus {
self.known_headers
@@ -328,10 +337,23 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
pub fn completion_response(&mut self, id: &HeaderId<P::Hash, P::Number>, completion: Option<P::Completion>) {
let completion = match completion {
Some(completion) => completion,
None => return, // we'll try refetch later
None => {
log::debug!(
target: "bridge",
"{} Node is still missing completion data for header: {:?}. Will retry later.",
P::SOURCE_NAME,
id,
);
return;
}
};
if self.incomplete_headers.remove(id).is_some() {
// do not remove from `incomplete_headers` here, because otherwise we'll miss
// completion 'notification'
// this could lead to duplicate completion retrieval (if completion transaction isn't mined
// for too long)
if self.incomplete_headers.get(id).is_some() {
log::debug!(
target: "bridge",
"Received completion data from {} for header: {:?}",
@@ -381,15 +403,8 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
}
}
/// When incomplete headers ids are receved from target node.
pub fn incomplete_headers_response(&mut self, ids: HashSet<HeaderId<P::Hash, P::Number>>) {
// all new incomplete headers are marked Synced and all their descendants
// are moved from Ready/Submitted to Incomplete queue
let new_incomplete_headers = ids
.iter()
.filter(|id| !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id))
.cloned()
.collect::<Vec<_>>();
/// Marks given headers incomplete.
pub fn add_incomplete_headers(&mut self, new_incomplete_headers: Vec<HeaderId<P::Hash, P::Number>>) {
for new_incomplete_header in new_incomplete_headers {
self.header_synced(&new_incomplete_header);
move_header_descendants::<P>(
@@ -408,6 +423,18 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.incomplete_headers.insert(new_incomplete_header, None);
}
}
/// When incomplete headers ids are receved from target node.
pub fn incomplete_headers_response(&mut self, ids: HashSet<HeaderId<P::Hash, P::Number>>) {
// all new incomplete headers are marked Synced and all their descendants
// are moved from Ready/Submitted to Incomplete queue
let new_incomplete_headers = ids
.iter()
.filter(|id| !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id))
.cloned()
.collect::<Vec<_>>();
self.add_incomplete_headers(new_incomplete_headers);
// for all headers that were incompleted previously, but now are completed, we move
// all descendants from incomplete to ready
@@ -487,6 +514,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.incomplete.clear();
self.submitted.clear();
self.known_headers.clear();
self.best_synced_number = Zero::zero();
self.prune_border = Zero::zero();
}
@@ -519,6 +547,9 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// When we receive new Synced header from target node.
fn header_synced(&mut self, id: &HeaderId<P::Hash, P::Number>) {
// update best synced block number
self.best_synced_number = std::cmp::max(self.best_synced_number, id.0);
// all ancestors of this header are now synced => let's remove them from
// queues
let mut current = *id;
@@ -1272,7 +1303,7 @@ pub(crate) mod tests {
// when response is Some, we're scheduling completion
queue.completion_response(&id(200), Some(()));
assert_eq!(queue.incomplete_headers.len(), 1);
assert_eq!(queue.incomplete_headers.len(), 2);
assert_eq!(queue.completion_data.len(), 1);
assert!(queue.incomplete_headers.contains_key(&id(100)));
assert!(queue.completion_data.contains_key(&id(200)));
+43 -16
View File
@@ -21,7 +21,7 @@ use crate::substrate_types::{
into_substrate_ethereum_header, into_substrate_ethereum_receipts, Hash, Header as SubstrateHeader, Number,
SignedBlock as SignedSubstrateBlock,
};
use crate::sync_types::HeaderId;
use crate::sync_types::{HeaderId, SubmittedHeaders};
use async_trait::async_trait;
use codec::{Decode, Encode};
@@ -32,6 +32,7 @@ use num_traits::Zero;
use sp_bridge_eth_poa::Header as SubstrateEthereumHeader;
use sp_core::crypto::Pair;
use sp_runtime::traits::IdentifyAccount;
use std::collections::VecDeque;
const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_requires_receipts";
const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block";
@@ -193,20 +194,20 @@ pub trait SubmitEthereumHeaders: SubstrateRpc {
params: SubstrateSigningParams,
headers: Vec<QueuedEthereumHeader>,
sign_transactions: bool,
) -> Result<Vec<EthereumHeaderId>>;
) -> SubmittedHeaders<EthereumHeaderId, RpcError>;
/// Submits signed Ethereum header to Substrate runtime.
async fn submit_signed_ethereum_headers(
&self,
params: SubstrateSigningParams,
headers: Vec<QueuedEthereumHeader>,
) -> Result<Vec<EthereumHeaderId>>;
) -> SubmittedHeaders<EthereumHeaderId, RpcError>;
/// Submits unsigned Ethereum header to Substrate runtime.
async fn submit_unsigned_ethereum_headers(
&self,
headers: Vec<QueuedEthereumHeader>,
) -> Result<Vec<EthereumHeaderId>>;
) -> SubmittedHeaders<EthereumHeaderId, RpcError>;
}
#[async_trait]
@@ -216,7 +217,7 @@ impl SubmitEthereumHeaders for SubstrateRpcClient {
params: SubstrateSigningParams,
headers: Vec<QueuedEthereumHeader>,
sign_transactions: bool,
) -> Result<Vec<EthereumHeaderId>> {
) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
if sign_transactions {
self.submit_signed_ethereum_headers(params, headers).await
} else {
@@ -228,29 +229,55 @@ impl SubmitEthereumHeaders for SubstrateRpcClient {
&self,
params: SubstrateSigningParams,
headers: Vec<QueuedEthereumHeader>,
) -> Result<Vec<EthereumHeaderId>> {
) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
let ids = headers.iter().map(|header| header.id()).collect();
let submission_result = async {
let account_id = params.signer.public().as_array_ref().clone().into();
let nonce = self.next_account_index(account_id).await?;
let account_id = params.signer.public().as_array_ref().clone().into();
let nonce = self.next_account_index(account_id).await?;
let transaction = create_signed_submit_transaction(headers, &params.signer, nonce, self.genesis_hash);
let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?;
Ok(())
}
.await;
let transaction = create_signed_submit_transaction(headers, &params.signer, nonce, self.genesis_hash);
let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?;
Ok(ids)
match submission_result {
Ok(_) => SubmittedHeaders {
submitted: ids,
incomplete: Vec::new(),
rejected: Vec::new(),
fatal_error: None,
},
Err(error) => SubmittedHeaders {
submitted: Vec::new(),
incomplete: Vec::new(),
rejected: ids,
fatal_error: Some(error),
},
}
}
async fn submit_unsigned_ethereum_headers(
&self,
headers: Vec<QueuedEthereumHeader>,
) -> Result<Vec<EthereumHeaderId>> {
let ids = headers.iter().map(|header| header.id()).collect();
) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
let mut ids = headers.iter().map(|header| header.id()).collect::<VecDeque<_>>();
let mut submitted_headers = SubmittedHeaders::default();
for header in headers {
let id = ids.pop_front().expect("both collections have same size; qed");
let transaction = create_unsigned_submit_transaction(header);
let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?;
match self.submit_extrinsic(Bytes(transaction.encode())).await {
Ok(_) => submitted_headers.submitted.push(id),
Err(error) => {
submitted_headers.rejected.push(id);
submitted_headers.rejected.extend(ids);
submitted_headers.fatal_error = Some(error);
break;
}
}
}
Ok(ids)
submitted_headers
}
}
@@ -28,7 +28,7 @@ use crate::substrate_types::{
};
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::SourceHeader;
use crate::sync_types::{SourceHeader, SubmittedHeaders};
use async_trait::async_trait;
@@ -165,7 +165,10 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
self.client.substrate_header_known(self.contract, id).await
}
async fn submit_headers(&self, headers: Vec<QueuedSubstrateHeader>) -> Result<Vec<SubstrateHeaderId>, Self::Error> {
async fn submit_headers(
&self,
headers: Vec<QueuedSubstrateHeader>,
) -> SubmittedHeaders<SubstrateHeaderId, Self::Error> {
self.client
.submit_substrate_headers(self.sign_params.clone(), self.contract, headers)
.await
+24 -1
View File
@@ -113,7 +113,10 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
}
// we assume that there were no reorgs if we have already downloaded best header
let best_downloaded_number = std::cmp::max(self.headers.best_queued_number(), target_best_header.0);
let best_downloaded_number = std::cmp::max(
std::cmp::max(self.headers.best_queued_number(), self.headers.best_synced_number()),
target_best_header.0,
);
if best_downloaded_number == source_best_number {
return None;
}
@@ -257,6 +260,26 @@ mod tests {
assert_eq!(eth_sync.select_new_header_to_download(), None);
}
#[test]
fn select_new_header_to_download_works_with_empty_queue() {
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(default_sync_params());
eth_sync.source_best_header_number_response(100);
// when queue is not empty => everything goes as usually
eth_sync.target_best_header_response(header(10).id());
eth_sync.headers_mut().header_response(header(11).header().clone());
eth_sync.headers_mut().maybe_extra_response(&header(11).id(), false);
assert_eq!(eth_sync.select_new_header_to_download(), Some(12));
// but then queue is drained
eth_sync.headers_mut().target_best_header_response(&header(11).id());
// even though it's empty, we know that header#11 is synced
assert_eq!(eth_sync.headers().best_queued_number(), 0);
assert_eq!(eth_sync.headers().best_synced_number(), 11);
assert_eq!(eth_sync.select_new_header_to_download(), Some(12));
}
#[test]
fn sync_without_reorgs_works() {
let mut eth_sync = HeadersSync::new(default_sync_params());
+16 -7
View File
@@ -15,7 +15,9 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::sync::HeadersSyncParams;
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader};
use crate::sync_types::{
HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader, SubmittedHeaders,
};
use async_trait::async_trait;
use futures::{future::FutureExt, stream::StreamExt};
@@ -91,7 +93,7 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
async fn submit_headers(
&self,
headers: Vec<QueuedHeader<P>>,
) -> Result<Vec<HeaderId<P::Hash, P::Number>>, Self::Error>;
) -> SubmittedHeaders<HeaderId<P::Hash, P::Number>, Self::Error>;
/// Returns ID of headers that require to be 'completed' before children can be submitted.
async fn incomplete_headers_ids(&self) -> Result<HashSet<HeaderId<P::Hash, P::Number>>, Self::Error>;
@@ -111,10 +113,10 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
}
/// Run headers synchronization.
pub fn run<P: HeadersSyncPipeline>(
pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
source_client: impl SourceClient<P>,
source_tick: Duration,
target_client: impl TargetClient<P>,
target_client: TC,
target_tick: Duration,
sync_params: HeadersSyncParams,
) {
@@ -288,14 +290,21 @@ pub fn run<P: HeadersSyncPipeline>(
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
target_submit_header_result = target_submit_header_future => {
submitted_headers = target_submit_header_future => {
// following line helps Rust understand the type of `submitted_headers` :/
let submitted_headers: SubmittedHeaders<HeaderId<P::Hash, P::Number>, TC::Error> = submitted_headers;
let maybe_fatal_error = submitted_headers.fatal_error.map(Err).unwrap_or(Ok(()));
target_client_is_online = process_future_result(
target_submit_header_result,
|submitted_headers| sync.headers_mut().headers_submitted(submitted_headers),
maybe_fatal_error,
|_| {},
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);
sync.headers_mut().headers_submitted(submitted_headers.submitted);
sync.headers_mut().add_incomplete_headers(submitted_headers.incomplete);
},
target_complete_header_result = target_complete_header_future => {
target_client_is_online = process_future_result(
+27
View File
@@ -161,3 +161,30 @@ impl<P: HeadersSyncPipeline> QueuedHeader<P> {
&self.extra
}
}
/// Headers submission result.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct SubmittedHeaders<Id, Error> {
/// IDs of headers that have been submitted to target node.
pub submitted: Vec<Id>,
/// IDs of incomplete headers. These headers were submitted (so this id is also in `submitted` vec),
/// but all descendants are not.
pub incomplete: Vec<Id>,
/// IDs of ignored headers that we have decided not to submit (they're either rejected by
/// target node immediately, or they're descendants of incomplete headers).
pub rejected: Vec<Id>,
/// Fatal target node error, if it has occured during submission.
pub fatal_error: Option<Error>,
}
impl<Id, Error> Default for SubmittedHeaders<Id, Error> {
fn default() -> Self {
SubmittedHeaders {
submitted: Vec::new(),
incomplete: Vec::new(),
rejected: Vec::new(),
fatal_error: None,
}
}
}