Prepare for sub to eth relay - part2 (#253)

* fixed PoA contract deploy (granda_authorities call)

* pause if all submitted headers were rejected

* give funds to Bertha and Carlos

* max 1 active PoA transaction in headers sync :(

* display initial header id when deploying PoA contract

* cargo fmt + clipy

* update PoA contract to accept <= 4 Substrate headers at once

* pause submitting headers when contract rejects all new headers + we have active transactions

* fix compilation

* cargo fmt --all

* does_not_select_new_headers_to_submit_when_submit_is_paused

* updated bridge contract

* Update relays/ethereum/src/sync.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* WiP.

* Submit first incomplete header.

* Finish up test.

* cargo fmt --all

* Remove redundant clone.

* Address review comments.

* cargo fmt --all

* Fix clippy.

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Tomasz Drwięga <tomasz@parity.io>
This commit is contained in:
Svyatoslav Nikolsky
2020-08-05 17:41:46 +03:00
committed by Bastian Köcher
parent 7f8360d8ab
commit 868814e4b6
9 changed files with 433 additions and 108 deletions
@@ -69,11 +69,26 @@
"inputs": [
{
"internalType": "bytes",
"name": "rawHeader",
"name": "rawHeader1",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader2",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader3",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader4",
"type": "bytes"
}
],
"name": "importHeader",
"name": "importHeaders",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
@@ -100,16 +115,31 @@
"inputs": [
{
"internalType": "bytes",
"name": "rawHeader",
"name": "rawHeader1",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader2",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader3",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "rawHeader4",
"type": "bytes"
}
],
"name": "isIncompleteHeader",
"name": "isIncompleteHeaders",
"outputs": [
{
"internalType": "bool",
"internalType": "uint256",
"name": "",
"type": "bool"
"type": "uint256"
}
],
"stateMutability": "view",
File diff suppressed because one or more lines are too long
@@ -1,5 +1,5 @@
Last Change Date: 2020-07-03
Solc version: 0.6.6+commit.6c089d02
Source hash (keccak256): 0x3e6339beefe6786f4f26b408d4f727e03c6fd9630d692af9a7f6b46143fa308f
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/1d0fa475a2ba3a70a47ed2dd870568c42ec16c8c/substrate-bridge.sol
Last Change Date: 2020-07-30
Solc version: 0.6.6+commit.6c089d02.Linux.g++
Source hash (keccak256): 0xea5d6d744f69157adc2857166792aca139c0b5b186ba89c1011358fbcad90d7e
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/6456d3e016c95cd5e6d5e817c23e9e69e739aa78/substrate-bridge.sol
Compiler flags used (command to produce the file): `docker run -i ethereum/solc:0.6.6 --optimize --bin - < substrate-bridge.sol`
+289 -74
View File
@@ -32,12 +32,12 @@ use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use parity_crypto::publickey::KeyPair;
use std::collections::{HashSet, VecDeque};
use std::collections::HashSet;
// to encode/decode contract calls
ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json");
type Result<T> = std::result::Result<T, RpcError>;
type RpcResult<T> = std::result::Result<T, RpcError>;
/// Ethereum connection params.
#[derive(Debug, Clone)]
@@ -104,15 +104,15 @@ impl EthereumRpcClient {
#[async_trait]
impl EthereumRpc for EthereumRpcClient {
async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
async fn estimate_gas(&self, call_request: CallRequest) -> RpcResult<U256> {
Ok(Ethereum::estimate_gas(&self.client, call_request).await?)
}
async fn best_block_number(&self) -> Result<u64> {
async fn best_block_number(&self) -> RpcResult<u64> {
Ok(Ethereum::block_number(&self.client).await?.as_u64())
}
async fn header_by_number(&self, block_number: u64) -> Result<Header> {
async fn header_by_number(&self, block_number: u64) -> RpcResult<Header> {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_number(&self.client, block_number, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
@@ -121,7 +121,7 @@ impl EthereumRpc for EthereumRpcClient {
}
}
async fn header_by_hash(&self, hash: H256) -> Result<Header> {
async fn header_by_hash(&self, hash: H256) -> RpcResult<Header> {
let get_full_tx_objects = false;
let header = Ethereum::get_block_by_hash(&self.client, hash, get_full_tx_objects).await?;
match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() {
@@ -130,7 +130,7 @@ impl EthereumRpc for EthereumRpcClient {
}
}
async fn header_by_number_with_transactions(&self, number: u64) -> Result<HeaderWithTransactions> {
async fn header_by_number_with_transactions(&self, number: u64) -> RpcResult<HeaderWithTransactions> {
let get_full_tx_objects = true;
let header = Ethereum::get_block_by_number_with_transactions(&self.client, number, get_full_tx_objects).await?;
@@ -147,7 +147,7 @@ impl EthereumRpc for EthereumRpcClient {
Ok(header)
}
async fn header_by_hash_with_transactions(&self, hash: H256) -> Result<HeaderWithTransactions> {
async fn header_by_hash_with_transactions(&self, hash: H256) -> RpcResult<HeaderWithTransactions> {
let get_full_tx_objects = true;
let header = Ethereum::get_block_by_hash_with_transactions(&self.client, hash, get_full_tx_objects).await?;
@@ -164,24 +164,26 @@ impl EthereumRpc for EthereumRpcClient {
Ok(header)
}
async fn transaction_by_hash(&self, hash: H256) -> Result<Option<Transaction>> {
async fn transaction_by_hash(&self, hash: H256) -> RpcResult<Option<Transaction>> {
Ok(Ethereum::transaction_by_hash(&self.client, hash).await?)
}
async fn transaction_receipt(&self, transaction_hash: H256) -> Result<Receipt> {
async fn transaction_receipt(&self, transaction_hash: H256) -> RpcResult<Receipt> {
Ok(Ethereum::get_transaction_receipt(&self.client, transaction_hash).await?)
}
async fn account_nonce(&self, address: Address) -> Result<U256> {
async fn account_nonce(&self, address: Address) -> RpcResult<U256> {
Ok(Ethereum::get_transaction_count(&self.client, address).await?)
}
async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result<TransactionHash> {
async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> RpcResult<TransactionHash> {
let transaction = Bytes(signed_raw_tx);
Ok(Ethereum::submit_transaction(&self.client, transaction).await?)
let tx_hash = Ethereum::submit_transaction(&self.client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash);
Ok(tx_hash)
}
async fn eth_call(&self, call_transaction: CallRequest) -> Result<Bytes> {
async fn eth_call(&self, call_transaction: CallRequest) -> RpcResult<Bytes> {
Ok(Ethereum::call(&self.client, call_transaction).await?)
}
}
@@ -191,14 +193,14 @@ impl EthereumRpc for EthereumRpcClient {
#[async_trait]
pub trait EthereumHighLevelRpc: EthereumRpc {
/// Returns best Substrate block that PoA chain knows of.
async fn best_substrate_block(&self, contract_address: Address) -> Result<SubstrateHeaderId>;
async fn best_substrate_block(&self, contract_address: Address) -> RpcResult<SubstrateHeaderId>;
/// Returns true if Substrate header is known to Ethereum node.
async fn substrate_header_known(
&self,
contract_address: Address,
id: SubstrateHeaderId,
) -> Result<(SubstrateHeaderId, bool)>;
) -> RpcResult<(SubstrateHeaderId, bool)>;
/// Submits Substrate headers to Ethereum contract.
async fn submit_substrate_headers(
@@ -209,7 +211,7 @@ pub trait EthereumHighLevelRpc: EthereumRpc {
) -> SubmittedHeaders<SubstrateHeaderId, RpcError>;
/// Returns ids of incomplete Substrate headers.
async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>>;
async fn incomplete_substrate_headers(&self, contract_address: Address) -> RpcResult<HashSet<SubstrateHeaderId>>;
/// Complete Substrate header.
async fn complete_substrate_header(
@@ -218,7 +220,7 @@ pub trait EthereumHighLevelRpc: EthereumRpc {
contract_address: Address,
id: SubstrateHeaderId,
justification: GrandpaJustification,
) -> Result<SubstrateHeaderId>;
) -> RpcResult<SubstrateHeaderId>;
/// Submit ethereum transaction.
async fn submit_ethereum_transaction(
@@ -228,19 +230,19 @@ pub trait EthereumHighLevelRpc: EthereumRpc {
nonce: Option<U256>,
double_gas: bool,
encoded_call: Vec<u8>,
) -> Result<()>;
) -> RpcResult<()>;
/// Retrieve transactions receipts for given block.
async fn transaction_receipts(
&self,
id: EthereumHeaderId,
transactions: Vec<H256>,
) -> Result<(EthereumHeaderId, Vec<Receipt>)>;
) -> RpcResult<(EthereumHeaderId, Vec<Receipt>)>;
}
#[async_trait]
impl EthereumHighLevelRpc for EthereumRpcClient {
async fn best_substrate_block(&self, contract_address: Address) -> Result<SubstrateHeaderId> {
async fn best_substrate_block(&self, contract_address: Address) -> RpcResult<SubstrateHeaderId> {
let (encoded_call, call_decoder) = bridge_contract::functions::best_known_header::call();
let call_request = CallRequest {
to: Some(contract_address),
@@ -263,7 +265,7 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
&self,
contract_address: Address,
id: SubstrateHeaderId,
) -> Result<(SubstrateHeaderId, bool)> {
) -> RpcResult<(SubstrateHeaderId, bool)> {
let (encoded_call, call_decoder) = bridge_contract::functions::is_known_header::call(id.1);
let call_request = CallRequest {
to: Some(contract_address),
@@ -313,7 +315,7 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
.await
}
async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>> {
async fn incomplete_substrate_headers(&self, contract_address: Address) -> RpcResult<HashSet<SubstrateHeaderId>> {
let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call();
let call_request = CallRequest {
to: Some(contract_address),
@@ -346,7 +348,7 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
contract_address: Address,
id: SubstrateHeaderId,
justification: GrandpaJustification,
) -> Result<SubstrateHeaderId> {
) -> RpcResult<SubstrateHeaderId> {
let _ = self
.submit_ethereum_transaction(
&params,
@@ -367,7 +369,7 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
nonce: Option<U256>,
double_gas: bool,
encoded_call: Vec<u8>,
) -> Result<()> {
) -> RpcResult<()> {
let nonce = if let Some(n) = nonce {
n
} else {
@@ -400,7 +402,7 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
&self,
id: EthereumHeaderId,
transactions: Vec<H256>,
) -> Result<(EthereumHeaderId, Vec<Receipt>)> {
) -> RpcResult<(EthereumHeaderId, Vec<Receipt>)> {
let mut transaction_receipts = Vec::with_capacity(transactions.len());
for transaction in transactions {
let transaction_receipt = self.transaction_receipt(transaction).await?;
@@ -410,18 +412,115 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
}
}
/// Max number of headers which can be sent to Solidity contract.
pub const HEADERS_BATCH: usize = 4;
/// Substrate headers to send to the Ethereum light client.
///
/// The Solidity contract can only accept a fixed number of headers in one go.
/// This struct is meant to encapsulate this limitation.
#[derive(Debug)]
#[cfg_attr(test, derive(Clone))]
pub struct HeadersBatch {
pub header1: QueuedSubstrateHeader,
pub header2: Option<QueuedSubstrateHeader>,
pub header3: Option<QueuedSubstrateHeader>,
pub header4: Option<QueuedSubstrateHeader>,
}
impl HeadersBatch {
/// Create new headers from given header & ids collections.
///
/// This method will pop `HEADERS_BATCH` items from both collections
/// and construct `Headers` object and a vector of `SubstrateheaderId`s.
pub fn pop_from(
headers: &mut Vec<QueuedSubstrateHeader>,
ids: &mut Vec<SubstrateHeaderId>,
) -> Result<(Self, Vec<SubstrateHeaderId>), ()> {
if headers.len() != ids.len() {
log::error!(target: "bridge", "Collection size mismatch ({} vs {})", headers.len(), ids.len());
return Err(());
}
let header1 = headers.pop().ok_or(())?;
let header2 = headers.pop();
let header3 = headers.pop();
let header4 = headers.pop();
let mut submitting_ids = Vec::with_capacity(HEADERS_BATCH);
for _ in 0..HEADERS_BATCH {
submitting_ids.extend(ids.pop().iter());
}
Ok((
Self {
header1,
header2,
header3,
header4,
},
submitting_ids,
))
}
/// Returns unified array of headers.
///
/// The first element is always `Some`.
fn headers(&self) -> [Option<&QueuedSubstrateHeader>; HEADERS_BATCH] {
[
Some(&self.header1),
self.header2.as_ref(),
self.header3.as_ref(),
self.header4.as_ref(),
]
}
/// Encodes all headers. If header is not present an empty vector will be returned.
pub fn encode(&self) -> [Vec<u8>; HEADERS_BATCH] {
let encode = |h: &QueuedSubstrateHeader| h.header().encode();
let headers = self.headers();
[
headers[0].map(encode).unwrap_or_default(),
headers[1].map(encode).unwrap_or_default(),
headers[2].map(encode).unwrap_or_default(),
headers[3].map(encode).unwrap_or_default(),
]
}
/// Returns number of contained headers.
pub fn len(&self) -> usize {
let is_set = |h: &Option<&QueuedSubstrateHeader>| if h.is_some() { 1 } else { 0 };
self.headers().iter().map(is_set).sum()
}
/// Remove headers starting from `idx` (0-based) from this collection.
///
/// The collection will be left with `[0, idx)` headers.
/// Returns `Err` when `idx == 0`, since `Headers` must contain at least one header,
/// or when `idx > HEADERS_BATCH`.
pub fn split_off(&mut self, idx: usize) -> Result<(), ()> {
if idx == 0 || idx > HEADERS_BATCH {
return Err(());
}
let mut vals: [_; HEADERS_BATCH] = [&mut None, &mut self.header2, &mut self.header3, &mut self.header4];
for val in vals.iter_mut().skip(idx) {
**val = None;
}
Ok(())
}
}
/// Substrate headers submitter API.
#[async_trait]
trait HeadersSubmitter {
/// Returns Ok(true) if not-yet-imported header is incomplete.
/// Returns Ok(false) if not-yet-imported header is complete.
/// Returns Ok(0) if all given not-yet-imported headers are complete.
/// Returns Ok(index != 0) where index is 1-based index of first header that is incomplete.
///
/// Returns Err(()) if contract has rejected header. This probably means
/// that the header is already imported by the contract.
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool>;
/// Returns Err(()) if contract has rejected headers. This means that the contract is
/// unable to import first header (e.g. it may already be imported).
async fn is_headers_incomplete(&self, headers: &HeadersBatch) -> RpcResult<usize>;
/// Submit given header to Ethereum node.
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()>;
/// Submit given headers to Ethereum node.
async fn submit_headers(&mut self, headers: HeadersBatch) -> RpcResult<()>;
}
/// Implementation of Substrate headers submitter that sends headers to running Ethereum node.
@@ -434,9 +533,9 @@ struct EthereumHeadersSubmitter {
#[async_trait]
impl HeadersSubmitter for EthereumHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
let (encoded_call, call_decoder) =
bridge_contract::functions::is_incomplete_header::call(header.header().encode());
async fn is_headers_incomplete(&self, headers: &HeadersBatch) -> RpcResult<usize> {
let [h1, h2, h3, h4] = headers.encode();
let (encoded_call, call_decoder) = bridge_contract::functions::is_incomplete_headers::call(h1, h2, h3, h4);
let call_request = CallRequest {
to: Some(self.contract_address),
data: Some(encoded_call.into()),
@@ -444,12 +543,16 @@ impl HeadersSubmitter for EthereumHeadersSubmitter {
};
let call_result = self.client.eth_call(call_request).await?;
let is_incomplete = call_decoder.decode(&call_result.0)?;
let incomplete_index: U256 = call_decoder.decode(&call_result.0)?;
if incomplete_index > HEADERS_BATCH.into() {
return Err(RpcError::Ethereum(EthereumNodeError::InvalidIncompleteIndex));
}
Ok(is_incomplete)
Ok(incomplete_index.low_u32() as _)
}
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
async fn submit_headers(&mut self, headers: HeadersBatch) -> RpcResult<()> {
let [h1, h2, h3, h4] = headers.encode();
let result = self
.client
.submit_ethereum_transaction(
@@ -457,7 +560,7 @@ impl HeadersSubmitter for EthereumHeadersSubmitter {
Some(self.contract_address),
Some(self.nonce),
false,
bridge_contract::functions::import_header::encode_input(header.header().encode()),
bridge_contract::functions::import_headers::encode_input(h1, h2, h3, h4),
)
.await;
@@ -472,16 +575,23 @@ impl HeadersSubmitter for EthereumHeadersSubmitter {
/// Submit multiple Substrate headers.
async fn submit_substrate_headers(
mut header_submitter: impl HeadersSubmitter,
headers: Vec<QueuedSubstrateHeader>,
mut headers: Vec<QueuedSubstrateHeader>,
) -> SubmittedHeaders<SubstrateHeaderId, RpcError> {
let mut ids = headers.iter().map(|header| header.id()).collect::<VecDeque<_>>();
let mut submitted_headers = SubmittedHeaders::default();
for header in headers {
let id = ids.pop_front().expect("both collections have same size; qed");
let mut ids = headers.iter().map(|header| header.id()).rev().collect::<Vec<_>>();
headers.reverse();
while !headers.is_empty() {
let (headers, submitting_ids) =
HeadersBatch::pop_from(&mut headers, &mut ids).expect("Headers and ids are not empty; qed");
submitted_headers.fatal_error =
submit_substrate_header(&mut header_submitter, &mut submitted_headers, id, header).await;
submit_substrate_headers_batch(&mut header_submitter, &mut submitted_headers, submitting_ids, headers)
.await;
if submitted_headers.fatal_error.is_some() {
ids.reverse();
submitted_headers.rejected.extend(ids);
break;
}
@@ -490,28 +600,31 @@ async fn submit_substrate_headers(
submitted_headers
}
/// Submit single Substrate header.
async fn submit_substrate_header(
/// Submit 4 Substrate headers in single PoA transaction.
async fn submit_substrate_headers_batch(
header_submitter: &mut impl HeadersSubmitter,
submitted_headers: &mut SubmittedHeaders<SubstrateHeaderId, RpcError>,
id: SubstrateHeaderId,
header: QueuedSubstrateHeader,
mut ids: Vec<SubstrateHeaderId>,
mut headers: HeadersBatch,
) -> Option<RpcError> {
// if parent of this header is either incomplete, or rejected, we assume that contract
debug_assert_eq!(ids.len(), headers.len(),);
// if parent of first header is either incomplete, or rejected, we assume that contract
// will reject this header as well
let parent_id = header.parent_id();
let parent_id = headers.header1.parent_id();
if submitted_headers.rejected.contains(&parent_id) || submitted_headers.incomplete.contains(&parent_id) {
submitted_headers.rejected.push(id);
submitted_headers.rejected.extend(ids);
return None;
}
// check if this header is incomplete
let is_header_incomplete = match header_submitter.is_header_incomplete(&header).await {
Ok(true) => true,
Ok(false) => false,
// check if headers are incomplete
let incomplete_header_index = match header_submitter.is_headers_incomplete(&headers).await {
// All headers valid
Ok(0) => None,
Ok(incomplete_header_index) => Some(incomplete_header_index),
Err(error) => {
// contract has rejected this header => we do not want to submit it
submitted_headers.rejected.push(id);
// contract has rejected all headers => we do not want to submit it
submitted_headers.rejected.extend(ids);
if error.is_connection_error() {
return Some(error);
} else {
@@ -520,17 +633,30 @@ async fn submit_substrate_header(
}
};
// submit header and update submitted headers
match header_submitter.submit_header(header).await {
// Modify `ids` and `headers` to only contain values that are going to be accepted.
let rejected = if let Some(idx) = incomplete_header_index {
let len = std::cmp::min(idx, ids.len());
headers
.split_off(len)
.expect("len > 0, the case where all headers are valid is converted to None; qed");
ids.split_off(len)
} else {
Vec::new()
};
let submitted = ids;
let submit_result = header_submitter.submit_headers(headers).await;
match submit_result {
Ok(_) => {
submitted_headers.submitted.push(id);
if is_header_incomplete {
submitted_headers.incomplete.push(id);
if incomplete_header_index.is_some() {
submitted_headers.incomplete.extend(submitted.iter().last().cloned());
}
submitted_headers.submitted.extend(submitted);
submitted_headers.rejected.extend(rejected);
None
}
Err(error) => {
submitted_headers.rejected.push(id);
submitted_headers.rejected.extend(submitted);
submitted_headers.rejected.extend(rejected);
Some(error)
}
}
@@ -549,16 +675,16 @@ mod tests {
#[async_trait]
impl HeadersSubmitter for TestHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
if self.incomplete.iter().any(|i| i.0 == header.id().0) {
Ok(true)
async fn is_headers_incomplete(&self, headers: &HeadersBatch) -> RpcResult<usize> {
if self.incomplete.iter().any(|i| i.0 == headers.header1.id().0) {
Ok(1)
} else {
Ok(false)
Ok(0)
}
}
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
if self.failed.iter().any(|i| i.0 == header.id().0) {
async fn submit_headers(&mut self, headers: HeadersBatch) -> RpcResult<()> {
if self.failed.iter().any(|i| i.0 == headers.header1.id().0) {
Err(RpcError::Ethereum(EthereumNodeError::InvalidSubstrateBlockNumber))
} else {
Ok(())
@@ -600,13 +726,102 @@ mod tests {
let submitted_headers = async_std::task::block_on(submit_substrate_headers(
TestHeadersSubmitter {
incomplete: vec![],
failed: vec![header(6).id()],
failed: vec![header(9).id()],
},
vec![header(5), header(6), header(7)],
vec![
header(5),
header(6),
header(7),
header(8),
header(9),
header(10),
header(11),
],
));
assert_eq!(submitted_headers.submitted, vec![header(5).id()]);
assert_eq!(
submitted_headers.submitted,
vec![header(5).id(), header(6).id(), header(7).id(), header(8).id()]
);
assert_eq!(submitted_headers.incomplete, vec![]);
assert_eq!(submitted_headers.rejected, vec![header(6).id(), header(7).id()]);
assert_eq!(
submitted_headers.rejected,
vec![header(9).id(), header(10).id(), header(11).id(),]
);
assert!(submitted_headers.fatal_error.is_some());
}
fn headers_batch() -> HeadersBatch {
let mut init_headers = vec![header(1), header(2), header(3), header(4), header(5)];
init_headers.reverse();
let mut init_ids = init_headers.iter().map(|h| h.id()).collect();
let (headers, ids) = HeadersBatch::pop_from(&mut init_headers, &mut init_ids).unwrap();
assert_eq!(init_headers, vec![header(5)]);
assert_eq!(init_ids, vec![header(5).id()]);
assert_eq!(
ids,
vec![header(1).id(), header(2).id(), header(3).id(), header(4).id()]
);
headers
}
#[test]
fn headers_batch_len() {
let headers = headers_batch();
assert_eq!(headers.len(), 4);
}
#[test]
fn headers_batch_encode() {
let headers = headers_batch();
assert_eq!(
headers.encode(),
[
header(1).header().encode(),
header(2).header().encode(),
header(3).header().encode(),
header(4).header().encode(),
]
);
}
#[test]
fn headers_batch_split_off() {
// given
let mut headers = headers_batch();
// when
assert!(headers.split_off(0).is_err());
assert_eq!(headers.header1, header(1));
assert!(headers.header2.is_some());
assert!(headers.header3.is_some());
assert!(headers.header4.is_some());
// when
let mut h = headers.clone();
h.split_off(1).unwrap();
assert!(h.header2.is_none());
assert!(h.header3.is_none());
assert!(h.header4.is_none());
// when
let mut h = headers.clone();
h.split_off(2).unwrap();
assert!(h.header2.is_some());
assert!(h.header3.is_none());
assert!(h.header4.is_none());
// when
let mut h = headers.clone();
h.split_off(3).unwrap();
assert!(h.header2.is_some());
assert!(h.header3.is_some());
assert!(h.header4.is_none());
// when
let mut h = headers;
h.split_off(4).unwrap();
assert!(h.header2.is_some());
assert!(h.header3.is_some());
assert!(h.header4.is_some());
}
}
+1 -1
View File
@@ -157,7 +157,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
) -> Result<(), ()> {
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
Ok(best_finalized_header_id) => {
log::trace!(
log::debug!(
target: "bridge",
"Got best finalized {} block from {} node: {:?}",
P::SOURCE_NAME,
@@ -100,6 +100,8 @@ pub enum EthereumNodeError {
/// An invalid Substrate block number was received from
/// an Ethereum node.
InvalidSubstrateBlockNumber,
/// An invalid index has been received from an Ethereum node.
InvalidIncompleteIndex,
}
impl ToString for EthereumNodeError {
@@ -112,6 +114,7 @@ impl ToString for EthereumNodeError {
}
Self::IncompleteTransaction => "Incomplete Ethereum Transaction (missing required field - raw)".to_string(),
Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node".to_string(),
Self::InvalidIncompleteIndex => "Received an invalid incomplete index from Ethereum Node".to_string(),
}
}
}
+21 -19
View File
@@ -39,7 +39,7 @@ const ETH_API_BEST_FINALIZED_BLOCK: &str = "RialtoHeaderApi_finalized_block";
const EXCH_API_FILTER_TRANSACTION_PROOF: &str = "RialtoCurrencyExchangeApi_filter_transaction_proof";
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
type Result<T> = std::result::Result<T, RpcError>;
type RpcResult<T> = std::result::Result<T, RpcError>;
type GrandpaAuthorityList = Vec<u8>;
/// Substrate connection params.
@@ -93,7 +93,7 @@ pub struct SubstrateRpcClient {
impl SubstrateRpcClient {
/// Returns client that is able to call RPCs on Substrate node.
pub async fn new(params: SubstrateConnectionParams, instance: Box<dyn BridgeInstance>) -> Result<Self> {
pub async fn new(params: SubstrateConnectionParams, instance: Box<dyn BridgeInstance>) -> RpcResult<Self> {
let uri = format!("http://{}:{}", params.host, params.port);
let transport = HttpTransportClient::new(&uri);
let raw_client = RawClient::new(transport);
@@ -112,32 +112,32 @@ impl SubstrateRpcClient {
#[async_trait]
impl SubstrateRpc for SubstrateRpcClient {
async fn best_header(&self) -> Result<SubstrateHeader> {
async fn best_header(&self) -> RpcResult<SubstrateHeader> {
Ok(Substrate::chain_get_header(&self.client, None).await?)
}
async fn get_block(&self, block_hash: Option<Hash>) -> Result<SignedSubstrateBlock> {
async fn get_block(&self, block_hash: Option<Hash>) -> RpcResult<SignedSubstrateBlock> {
Ok(Substrate::chain_get_block(&self.client, block_hash).await?)
}
async fn header_by_hash(&self, block_hash: Hash) -> Result<SubstrateHeader> {
async fn header_by_hash(&self, block_hash: Hash) -> RpcResult<SubstrateHeader> {
Ok(Substrate::chain_get_header(&self.client, block_hash).await?)
}
async fn block_hash_by_number(&self, number: Number) -> Result<Hash> {
async fn block_hash_by_number(&self, number: Number) -> RpcResult<Hash> {
Ok(Substrate::chain_get_block_hash(&self.client, number).await?)
}
async fn header_by_number(&self, block_number: Number) -> Result<SubstrateHeader> {
async fn header_by_number(&self, block_number: Number) -> RpcResult<SubstrateHeader> {
let block_hash = Self::block_hash_by_number(self, block_number).await?;
Ok(Self::header_by_hash(self, block_hash).await?)
}
async fn next_account_index(&self, account: node_primitives::AccountId) -> Result<node_primitives::Index> {
async fn next_account_index(&self, account: node_primitives::AccountId) -> RpcResult<node_primitives::Index> {
Ok(Substrate::system_account_next_index(&self.client, account).await?)
}
async fn best_ethereum_block(&self) -> Result<EthereumHeaderId> {
async fn best_ethereum_block(&self) -> RpcResult<EthereumHeaderId> {
let call = ETH_API_BEST_BLOCK.to_string();
let data = Bytes(Vec::new());
@@ -148,7 +148,7 @@ impl SubstrateRpc for SubstrateRpcClient {
Ok(best_header_id)
}
async fn best_ethereum_finalized_block(&self) -> Result<EthereumHeaderId> {
async fn best_ethereum_finalized_block(&self) -> RpcResult<EthereumHeaderId> {
let call = ETH_API_BEST_FINALIZED_BLOCK.to_string();
let data = Bytes(Vec::new());
@@ -159,7 +159,7 @@ impl SubstrateRpc for SubstrateRpcClient {
Ok(best_header_id)
}
async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> Result<bool> {
async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> RpcResult<bool> {
let call = ETH_API_IMPORT_REQUIRES_RECEIPTS.to_string();
let data = Bytes(header.encode());
@@ -175,7 +175,7 @@ impl SubstrateRpc for SubstrateRpcClient {
// But when we read the best header from Substrate next time, we will know that
// there's a better header. This Orphan will either be marked as synced, or
// eventually pruned.
async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> Result<bool> {
async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> RpcResult<bool> {
let call = ETH_API_IS_KNOWN_BLOCK.to_string();
let data = Bytes(header_id.1.encode());
@@ -185,11 +185,13 @@ impl SubstrateRpc for SubstrateRpcClient {
Ok(is_known_block)
}
async fn submit_extrinsic(&self, transaction: Bytes) -> Result<Hash> {
Ok(Substrate::author_submit_extrinsic(&self.client, transaction).await?)
async fn submit_extrinsic(&self, transaction: Bytes) -> RpcResult<Hash> {
let tx_hash = Substrate::author_submit_extrinsic(&self.client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash);
Ok(tx_hash)
}
async fn grandpa_authorities_set(&self, block: Hash) -> Result<GrandpaAuthorityList> {
async fn grandpa_authorities_set(&self, block: Hash) -> RpcResult<GrandpaAuthorityList> {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
let data = Bytes(Vec::new());
@@ -312,13 +314,13 @@ pub trait SubmitEthereumExchangeTransactionProof: SubstrateRpc {
async fn verify_exchange_transaction_proof(
&self,
proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof,
) -> Result<bool>;
) -> RpcResult<bool>;
/// Submits Ethereum exchange transaction proof to Substrate runtime.
async fn submit_exchange_transaction_proof(
&self,
params: SubstrateSigningParams,
proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof,
) -> Result<()>;
) -> RpcResult<()>;
}
#[async_trait]
@@ -326,7 +328,7 @@ impl SubmitEthereumExchangeTransactionProof for SubstrateRpcClient {
async fn verify_exchange_transaction_proof(
&self,
proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof,
) -> Result<bool> {
) -> RpcResult<bool> {
let call = EXCH_API_FILTER_TRANSACTION_PROOF.to_string();
let data = Bytes(proof.encode());
@@ -340,7 +342,7 @@ impl SubmitEthereumExchangeTransactionProof for SubstrateRpcClient {
&self,
params: SubstrateSigningParams,
proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof,
) -> Result<()> {
) -> RpcResult<()> {
let account_id = params.signer.public().as_array_ref().clone().into();
let nonce = self.next_account_index(account_id).await?;
+65
View File
@@ -91,6 +91,8 @@ pub struct HeadersSync<P: HeadersSyncPipeline> {
target_best_header: Option<HeaderIdOf<P>>,
/// Headers queue.
headers: QueuedHeaders<P>,
/// Pause headers submission.
pause_submit: bool,
}
impl<P: HeadersSyncPipeline> HeadersSync<P> {
@@ -101,6 +103,7 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
params,
source_best_number: None,
target_best_header: None,
pause_submit: false,
}
}
@@ -191,6 +194,11 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
/// Select headers that need to be submitted to the target node.
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader<P>>> {
// maybe we have paused new headers submit?
if self.pause_submit {
return None;
}
// if we operate in backup mode, we only submit headers when sync has stalled
if self.params.target_tx_mode == TargetTransactionMode::Backup && !stalled {
return None;
@@ -260,14 +268,40 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
// finally remember the best header itself
self.target_best_header = Some(best_header);
// we are ready to submit headers again
if self.pause_submit {
log::debug!(
target: "bridge",
"Ready to submit {} headers to {} node again!",
P::SOURCE_NAME,
P::TARGET_NAME,
);
self.pause_submit = false;
}
true
}
/// Pause headers submit until best header will be updated on target node.
pub fn pause_submit(&mut self) {
log::debug!(
target: "bridge",
"Stopping submitting {} headers to {} node. Waiting for {} submitted headers to be accepted",
P::SOURCE_NAME,
P::TARGET_NAME,
self.headers.headers_in_status(HeaderStatus::Submitted),
);
self.pause_submit = true;
}
/// Restart synchronization.
pub fn restart(&mut self) {
self.source_best_number = None;
self.target_best_header = None;
self.headers.clear();
self.pause_submit = false;
}
}
@@ -481,4 +515,35 @@ pub mod tests {
// ensure that headers are not submitted when sync is stalled
assert_eq!(eth_sync.select_headers_to_submit(true), Some(vec![&header(101)]));
}
#[test]
fn does_not_select_new_headers_to_submit_when_submit_is_paused() {
let mut eth_sync = HeadersSync::new(default_sync_params());
eth_sync.params.max_headers_in_submitted_status = 1;
// ethereum reports best header #102 and substrate is at #100
eth_sync.source_best_header_number_response(102);
eth_sync.target_best_header_response(id(100));
// let's prepare #101 and #102 for submitting
eth_sync.headers.header_response(header(101).header().clone());
eth_sync.headers.maybe_extra_response(&id(101), false);
eth_sync.headers.header_response(header(102).header().clone());
eth_sync.headers.maybe_extra_response(&id(102), false);
// when submit is not paused, we're ready to submit #101
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)]));
// when submit is paused, we're not ready to submit anything
eth_sync.pause_submit();
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// if best header on substrate node isn't updated, we still not submitting anything
eth_sync.target_best_header_response(id(100));
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// but after it is actually updated, we are ready to submit
eth_sync.target_best_header_response(id(101));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)]));
}
}
+13 -3
View File
@@ -266,8 +266,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
false => {
log::info!(
target: "bridge",
"Possible {} fork detected. Restarting {} headers synchronization.",
P::TARGET_NAME,
"Sync has stalled. Restarting {} headers synchronization.",
P::SOURCE_NAME,
);
stall_countdown = None;
@@ -308,16 +307,21 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
// following line helps Rust understand the type of `submitted_headers` :/
let submitted_headers: SubmittedHeaders<HeaderIdOf<P>, TC::Error> = submitted_headers;
let submitted_headers_str = format!("{}", submitted_headers);
let all_headers_rejected = submitted_headers.submitted.is_empty()
&& submitted_headers.incomplete.is_empty();
let has_submitted_headers = sync.headers().headers_in_status(HeaderStatus::Submitted) != 0;
let maybe_fatal_error = match submitted_headers.fatal_error {
Some(fatal_error) => Err(StringifiedMaybeConnectionError::new(
fatal_error.is_connection_error(),
format!("{:?}", fatal_error),
)),
None if submitted_headers.submitted.is_empty() && submitted_headers.incomplete.is_empty() =>
None if all_headers_rejected && !has_submitted_headers =>
Err(StringifiedMaybeConnectionError::new(false, "All headers were rejected".into())),
None => Ok(()),
};
let no_fatal_error = maybe_fatal_error.is_ok();
target_client_is_online = process_future_result(
maybe_fatal_error,
&mut target_retry_backoff,
@@ -331,6 +335,12 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync.headers_mut().headers_submitted(submitted_headers.submitted);
sync.headers_mut().add_incomplete_headers(submitted_headers.incomplete);
// when there's no fatal error, but node has rejected all our headers we may
// want to pause until our submitted headers will be accepted
if no_fatal_error && all_headers_rejected && has_submitted_headers {
sync.pause_submit();
}
},
target_complete_header_result = target_complete_header_future => {
target_client_is_online = process_future_result(