Process finality proofs in solidity PoA -> Substrate contract (#69)

* solidity contract

* continue

* upd

* cargo update

* fixes

* ehtereum_headers -> headers

* extracted some common stuff

* ethereum_sync.rs -> sync.rs

* make sync generic

* continue extracting

* continue

* add eth-contract argument

* continue

* some fixes

* contract v2

* continue

* more fixes

* more fixes

* deal with duplicated params

* removed multiple call_rpc variants

* bail_on_error!()

* fn submit_ethereum_transaction

* more fixes

* cargo fmt --all

* fix

* bail_on_arg_error!()

* fix

* fix

* remove async_extra stuff

* start work on finality builtin

remove async_extra stuff

continue

continue

local testnet (Alice + Bob) for node

* added TODO

* substrate-bridge.json -> substrate-bridge-abi.json

* get rid of substrate transactions hashes

* get rid of ethereum transactions hashes

* extracted contract bytecode to separate file

* cargo fmt --all

* avoid duplicate import in contracts

* removed Default::default()

* swapped configurations for sub2eth && eth2sub

* fix compilation

* do not double gas limit when submitting Substrate headers

* fix finality storage

* at least 1 validator required

* shift_session_manager_works

* cargo fmt --all

* solidity contract removed

* consts

* extracted solc compilation details to separate file

* removed (obsolete in future Vec<u8> justification)

* fixed cli option description

* fix typos

* fix grumble

* extracted constants

* log decoded header

* new substrate version + actually verify justification

* intermediate cargo fmt --all

* comments

* disable completion data resubmission

* increased timeouts + _MS -> Duration

* forget completion data after submission

* builtin tests

* headers tests

* cargo fmt --all

* update contract

* Update relays/ethereum/src/ethereum_sync_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update relays/ethereum/src/ethereum_sync_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* added docs

* OwnedFutureOutput

* more docs fixes

* cargo fmt --all

* encode headers

* consts + docs

* aliases again

* cargo fmt --all

* Update relays/ethereum/src/ethereum_sync_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update relays/ethereum/src/ethereum_sync_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Use Duration::from_secs() instead of from_millis()

* grumbles

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* incomplete_headers_are_still_incomplete_after_advance

* add hex-encoded headers to substrate_header_without_signal_parsed

* cargo fmt --all

* Update relays/ethereum/src/sync_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/ethereum/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* added comments on Extra and Completion

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-05-13 00:09:26 +03:00
committed by Bastian Köcher
parent 9496303aff
commit c9e81e48b5
14 changed files with 1190 additions and 214 deletions
@@ -10,11 +10,36 @@ edition = "2018"
# General dependencies
codec = { package = "parity-scale-codec", version = "1.0.0" }
finality-grandpa = { version = "0.12.2", features = ["derive-codec"] }
sp-blockchain = "2.0.0-alpha.5"
sp-finality-grandpa = "2.0.0-alpha.5"
sp-runtime = "2.0.0-alpha.5"
ethereum-types = "0.9.1"
# Runtime/chain specific dependencies
bridge-node-runtime = { path = "../../../bin/node/runtime" }
[dependencies.sp-blockchain]
version = "2.0.0-alpha.6"
rev = "c13ad41634d0bd7cf07897c2aa062b917d520520"
git = "https://github.com/paritytech/substrate/"
[dependencies.sp-finality-grandpa]
version = "2.0.0-alpha.6"
rev = "c13ad41634d0bd7cf07897c2aa062b917d520520"
git = "https://github.com/paritytech/substrate/"
[dependencies.sp-runtime]
version = "2.0.0-alpha.6"
rev = "c13ad41634d0bd7cf07897c2aa062b917d520520"
git = "https://github.com/paritytech/substrate/"
[dependencies.sc-finality-grandpa]
version = "0.8.0-dev"
rev = "c13ad41634d0bd7cf07897c2aa062b917d520520"
git = "https://github.com/paritytech/substrate/"
[dev-dependencies]
hex = "0.4"
[dev-dependencies.sp-core]
version = "2.0.0-alpha.6"
rev = "c13ad41634d0bd7cf07897c2aa062b917d520520"
git = "https://github.com/paritytech/substrate/"
@@ -14,17 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bridge_node_runtime::{BlockNumber, Hash, Header as RuntimeHeader};
use codec::Decode;
use bridge_node_runtime::{Block, BlockNumber, Hash, Header as RuntimeHeader};
use codec::{Decode, Encode};
use ethereum_types::U256;
use sp_blockchain::Error as ClientError;
use sp_finality_grandpa::{AuthorityList, ConsensusLog, GRANDPA_ENGINE_ID};
/// Builtin errors.
#[derive(Debug)]
pub enum Error {
/// Failed to decode block number.
BlockNumberDecode,
/// Failed to decode Substrate header.
HeaderDecode(codec::Error),
/// Failed to decode best voters set.
BestVotersDecode(codec::Error),
BestSetDecode(codec::Error),
/// Failed to decode finality proof.
FinalityProofDecode(codec::Error),
/// Failed to verify justification.
@@ -32,7 +36,7 @@ pub enum Error {
}
/// Substrate header.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Header {
/// Header hash.
pub hash: Hash,
@@ -45,7 +49,7 @@ pub struct Header {
}
/// GRANDPA validators set change signal.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct ValidatorsSetSignal {
/// Signal delay.
pub delay: BlockNumber,
@@ -53,6 +57,20 @@ pub struct ValidatorsSetSignal {
pub validators: Vec<u8>,
}
/// Convert from U256 to BlockNumber. Fails if `U256` value isn't fitting within `BlockNumber`
/// limits (the runtime referenced by this module uses u32 as `BlockNumber`).
pub fn to_substrate_block_number(number: U256) -> Result<BlockNumber, Error> {
match number == number.low_u32().into() {
true => Ok(number.low_u32()),
false => Err(Error::BlockNumberDecode),
}
}
/// Convert from BlockNumber to U256.
pub fn from_substrate_block_number(number: BlockNumber) -> Result<U256, Error> {
Ok(U256::from(number as u64))
}
/// Parse Substrate header.
pub fn parse_substrate_header(raw_header: &[u8]) -> Result<Header, Error> {
RuntimeHeader::decode(&mut &raw_header[..])
@@ -60,20 +78,243 @@ pub fn parse_substrate_header(raw_header: &[u8]) -> Result<Header, Error> {
hash: header.hash(),
parent_hash: header.parent_hash,
number: header.number,
signal: None, // TODO: parse me
signal: sp_runtime::traits::Header::digest(&header)
.log(|log| {
log.as_consensus().and_then(|(engine_id, log)| {
if engine_id == GRANDPA_ENGINE_ID {
Some(log)
} else {
None
}
})
})
.and_then(|log| ConsensusLog::decode(&mut &log[..]).ok())
.and_then(|log| match log {
ConsensusLog::ScheduledChange(scheduled_change) => Some(ValidatorsSetSignal {
delay: scheduled_change.delay,
validators: scheduled_change.next_authorities.encode(),
}),
_ => None,
}),
})
.map_err(Error::HeaderDecode)
}
/// Verify GRANDPA finality proof.
pub fn verify_substrate_finality_proof(
_best_set_id: u64,
_raw_best_voters: &[u8],
_raw_best_header: &[u8],
_raw_headers: &[&[u8]],
_raw_finality_proof: &[u8],
) -> Result<(usize, usize), Error> {
Err(Error::JustificationVerify(ClientError::Msg(
"Not yet implemented".into(),
))) // TODO: implement me
finality_target_number: BlockNumber,
finality_target_hash: Hash,
best_set_id: u64,
raw_best_set: &[u8],
raw_finality_proof: &[u8],
) -> Result<(), Error> {
let best_set = AuthorityList::decode(&mut &raw_best_set[..]).map_err(Error::BestSetDecode)?;
sc_finality_grandpa::GrandpaJustification::<Block>::decode_and_verify_finalizes(
&raw_finality_proof,
(finality_target_hash, finality_target_number),
best_set_id,
&best_set.into_iter().collect(),
)
.map_err(Error::JustificationVerify)
.map(|_| ())
}
#[cfg(test)]
mod tests {
use super::*;
use bridge_node_runtime::DigestItem;
use sp_core::crypto::Public;
use sp_finality_grandpa::{AuthorityId, ScheduledChange};
use sp_runtime::generic::Digest;
#[test]
fn to_substrate_block_number_succeeds() {
assert_eq!(to_substrate_block_number(U256::zero()).unwrap(), 0);
assert_eq!(
to_substrate_block_number(U256::from(std::u32::MAX as u64)).unwrap(),
0xFFFFFFFF
);
}
#[test]
fn to_substrate_block_number_fails() {
assert!(matches!(
to_substrate_block_number(U256::from(std::u32::MAX as u64 + 1)),
Err(Error::BlockNumberDecode)
));
}
#[test]
fn from_substrate_block_number_succeeds() {
assert_eq!(from_substrate_block_number(0).unwrap(), U256::zero());
assert_eq!(
from_substrate_block_number(std::u32::MAX).unwrap(),
U256::from(std::u32::MAX)
);
}
#[test]
fn substrate_header_without_signal_parsed() {
let raw_header = RuntimeHeader {
parent_hash: [0u8; 32].into(),
number: 0,
state_root: "b2fc47904df5e355c6ab476d89fbc0733aeddbe302f0b94ba4eea9283f7e89e7"
.parse()
.unwrap(),
extrinsics_root: "03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314"
.parse()
.unwrap(),
digest: Default::default(),
}
.encode();
assert_eq!(
raw_header,
hex::decode("000000000000000000000000000000000000000000000000000000000000000000b2fc47904df5e355c6ab476d89fbc0733aeddbe302f0b94ba4eea9283f7e89e703170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c11131400").unwrap(),
);
assert_eq!(
parse_substrate_header(&raw_header).unwrap(),
Header {
hash: "afbbeb92bf6ff14f60bdef0aa89f043dd403659ae82665238810ace0d761f6d0"
.parse()
.unwrap(),
parent_hash: Default::default(),
number: 0,
signal: None,
},
);
}
#[test]
fn substrate_header_with_signal_parsed() {
let authorities = vec![
(AuthorityId::from_slice(&[1; 32]), 101),
(AuthorityId::from_slice(&[3; 32]), 103),
];
let mut digest = Digest::default();
digest.push(DigestItem::Consensus(
GRANDPA_ENGINE_ID,
ConsensusLog::ScheduledChange(ScheduledChange {
next_authorities: authorities.clone(),
delay: 8,
})
.encode(),
));
let raw_header = RuntimeHeader {
parent_hash: "c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b"
.parse()
.unwrap(),
number: 8,
state_root: "822d6b412033aa9ac8e1722918eec5f25633529225754b3d4149982f5cacd4aa"
.parse()
.unwrap(),
extrinsics_root: "e7b07c0ce2799416ce7877b9cefc7f596bea5e8813bb2a0abf760414073ca928"
.parse()
.unwrap(),
digest,
}
.encode();
assert_eq!(
raw_header,
hex::decode("c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b20822d6b412033aa9ac8e1722918eec5f25633529225754b3d4149982f5cacd4aae7b07c0ce2799416ce7877b9cefc7f596bea5e8813bb2a0abf760414073ca928040446524e4b59010108010101010101010101010101010101010101010101010101010101010101010165000000000000000303030303030303030303030303030303030303030303030303030303030303670000000000000008000000").unwrap(),
);
assert_eq!(
parse_substrate_header(&raw_header).unwrap(),
Header {
hash: "3dfebb280bd87a4640f89d7f2adecd62b88148747bff5b63af6e1634ee37a56e"
.parse()
.unwrap(),
parent_hash: "c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b"
.parse()
.unwrap(),
number: 8,
signal: Some(ValidatorsSetSignal {
delay: 8,
validators: authorities.encode(),
}),
},
);
}
/// Number of the example block with justification.
const EXAMPLE_JUSTIFIED_BLOCK_NUMBER: u32 = 8;
/// Hash of the example block with justification.
const EXAMPLE_JUSTIFIED_BLOCK_HASH: &'static str =
"a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f343775";
/// Id of authorities set that have generated example justification. Could be computed by tracking
/// every set change in canonized headers.
const EXAMPLE_AUTHORITIES_SET_ID: u64 = 0;
/// Encoded authorities set that has generated example justification. Could be fetched from `ScheduledChange`
/// digest of the block that has scheduled this set OR by calling `GrandpaApi::grandpa_authorities()` at
/// appropriate block.
const EXAMPLE_AUTHORITIES_SET: &'static str = "1488dc3417d5058ec4b4503e0c12ea1a0a89be200fe98922423d4334014fa6b0ee0100000000000000d17c2d7823ebf260fd138f2d7e27d114c0145d968b5ff5006125f2414fadae690100000000000000439660b36c6c03afafca027b910b4fecf99801834c62a5e6006f27d978de234f01000000000000005e639b43e0052c47447dac87d6fd2b6ec50bdd4d0f614e4299c665249bbd09d901000000000000001dfe3e22cc0d45c70779c1095f7489a8ef3cf52d62fbd8c2fa38c9f1723502b50100000000000000";
/// Example justification. Could be fetched by calling 'chain_getBlock' RPC.
const EXAMPLE_JUSTIFICATION: &'static str = "2600000000000000a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f3437750800000010a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000d66b4ceb57ef8bcbc955071b597c8c5d2adcfdbb009c73f8438d342670fdeca9ac60686cbd58105b10f51d0a64a8e73b2e5829b2eab3248a008c472852130b00439660b36c6c03afafca027b910b4fecf99801834c62a5e6006f27d978de234fa2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000f5730c14d3cd22b7661e2f5fcb3139dd5fef37f946314a441d01b40ce1200ef70d810525f23fd278b588cd67473c200bda83c338c407b479386aa83798e5970b5e639b43e0052c47447dac87d6fd2b6ec50bdd4d0f614e4299c665249bbd09d9a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000c78d6ec463f476461a695b4791d30e7626d16fdf72d7c252c2cad387495a97e8c2827ed4d5af853d6e05d31cb6fb7438c9481a7e9c6990d60a9bfaf6a6e1930988dc3417d5058ec4b4503e0c12ea1a0a89be200fe98922423d4334014fa6b0eea2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f3437750800000052b4fc52d430286b3e2d650aa6e01b6ff4fae8b968893a62be789209eb97ee6e23780d3f5af7042d85bb48f1b202890b22724dfebce138826f66a5e00324320fd17c2d7823ebf260fd138f2d7e27d114c0145d968b5ff5006125f2414fadae6900";
#[test]
fn substrate_header_parse_fails() {
assert!(matches!(parse_substrate_header(&[]), Err(_)));
}
#[test]
fn verify_substrate_finality_proof_succeeds() {
verify_substrate_finality_proof(
EXAMPLE_JUSTIFIED_BLOCK_NUMBER,
EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(),
EXAMPLE_AUTHORITIES_SET_ID,
&hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(),
&hex::decode(EXAMPLE_JUSTIFICATION).unwrap(),
)
.unwrap();
}
#[test]
fn verify_substrate_finality_proof_fails_when_wrong_block_is_finalized() {
verify_substrate_finality_proof(
4,
Default::default(),
EXAMPLE_AUTHORITIES_SET_ID,
&hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(),
&hex::decode(EXAMPLE_JUSTIFICATION).unwrap(),
)
.unwrap_err();
}
#[test]
fn verify_substrate_finality_proof_fails_when_wrong_set_is_provided() {
verify_substrate_finality_proof(
EXAMPLE_JUSTIFIED_BLOCK_NUMBER,
EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(),
EXAMPLE_AUTHORITIES_SET_ID,
&hex::decode("deadbeef").unwrap(),
&hex::decode(EXAMPLE_JUSTIFICATION).unwrap(),
)
.unwrap_err();
}
#[test]
fn verify_substrate_finality_proof_fails_when_wrong_set_id_is_provided() {
verify_substrate_finality_proof(
EXAMPLE_JUSTIFIED_BLOCK_NUMBER,
EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(),
42,
&hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(),
&hex::decode(EXAMPLE_JUSTIFICATION).unwrap(),
)
.unwrap_err();
}
#[test]
fn verify_substrate_finality_proof_fails_when_wrong_proof_is_provided() {
verify_substrate_finality_proof(
EXAMPLE_JUSTIFIED_BLOCK_NUMBER,
EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(),
0,
&hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(),
&hex::decode("deadbeef").unwrap(),
)
.unwrap_err();
}
}
@@ -79,19 +79,18 @@
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "headerHash",
"type": "bytes32"
}
],
"name": "isFinalityProofRequired",
"inputs": [],
"name": "incompleteHeaders",
"outputs": [
{
"internalType": "bool",
"internalType": "uint256[]",
"name": "",
"type": "bool"
"type": "uint256[]"
},
{
"internalType": "bytes32[]",
"name": "",
"type": "bytes32[]"
}
],
"stateMutability": "view",
File diff suppressed because one or more lines are too long
@@ -1,5 +1,5 @@
Last Change Date: 2020-04-28
Last Change Date: 2020-05-01
Solc version: 0.6.6+commit.6c089d02
Source hash (keccak256): 0xdc46aff04e37129265223e507d17f1407a70cb1ecea3230e1eaa77a17586724d
Source gist: https://gist.github.com/svyatonik/876b388f9507a8de242cb2db9547c4f0
Source hash (keccak256): 0x36403636ad41082ca6c937c60ab06446cd9ef7036c178fa2f04d7c8286544d39
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/8b54f5f648f8685fecd52b7af1deb277922b0fc3/substrate-bridge.sol
Compiler flags used (command to produce the file): `docker run -i ethereum/solc:0.6.6 --optimize --bin - < substrate-bridge.sol`
+60 -1
View File
@@ -15,7 +15,7 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_types::{Address, Bytes, EthereumHeaderId, Header, Receipt, TransactionHash, H256, U256, U64};
use crate::substrate_types::{Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::sync_types::{HeaderId, MaybeConnectionError};
use crate::{bail_on_arg_error, bail_on_error};
use codec::{Decode, Encode};
@@ -26,6 +26,7 @@ use jsonrpsee::transport::http::{HttpTransportClient, RequestError};
use parity_crypto::publickey::KeyPair;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{from_value, to_value};
use std::collections::HashSet;
// to encode/decode contract calls
ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json");
@@ -300,6 +301,64 @@ pub async fn submit_substrate_headers(
(client, Ok(ids))
}
/// Returns ids of incomplete Substrate headers.
pub async fn incomplete_substrate_headers(
client: Client,
contract_address: Address,
) -> (Client, Result<HashSet<SubstrateHeaderId>, Error>) {
let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call();
let call_request = bail_on_arg_error!(
to_value(CallRequest {
to: Some(contract_address),
data: Some(encoded_call.into()),
})
.map_err(|e| Error::RequestSerialization(e)),
client
);
let (client, call_result) =
bail_on_error!(call_rpc::<Bytes>(client, "eth_call", Params::Array(vec![call_request]),).await);
match call_decoder.decode(&call_result.0) {
Ok((incomplete_headers_numbers, incomplete_headers_hashes)) => (
client,
Ok(incomplete_headers_numbers
.into_iter()
.zip(incomplete_headers_hashes)
.filter_map(|(number, hash)| {
if number != number.low_u32().into() {
return None;
}
Some(HeaderId(number.low_u32(), hash))
})
.collect()),
),
Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))),
}
}
/// Complete Substrate header.
pub async fn complete_substrate_header(
client: Client,
params: EthereumSigningParams,
contract_address: Address,
id: SubstrateHeaderId,
justification: GrandpaJustification,
) -> (Client, Result<SubstrateHeaderId, Error>) {
let (client, _) = bail_on_error!(
submit_ethereum_transaction(
client,
&params,
Some(contract_address),
None,
false,
bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification,),
)
.await
);
(client, Ok(id))
}
/// Deploy bridge contract.
pub async fn deploy_bridge_contract(
client: Client,
@@ -14,19 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Ethereum PoA -> Substrate synchronization.
use crate::ethereum_client::{self, EthereumConnectionParams};
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt};
use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams};
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use futures::future::FutureExt;
use std::{future::Future, pin::Pin};
use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient};
use futures::future::{ready, FutureExt, Ready};
use std::{collections::HashSet, future::Future, pin::Pin, time::Duration};
use web3::types::H256;
/// Interval (in ms) at which we check new Ethereum headers when we are synced/almost synced.
const ETHEREUM_TICK_INTERVAL_MS: u64 = 10_000;
/// Interval (in ms) at which we check new Substrate blocks.
const SUBSTRATE_TICK_INTERVAL_MS: u64 = 5_000;
/// Interval at which we check new Ethereum headers when we are synced/almost synced.
const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(10);
/// Interval at which we check new Substrate blocks.
const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(5);
/// Max number of headers in single submit transaction.
const MAX_HEADERS_IN_SINGLE_SUBMIT: usize = 32;
/// Max total size of headers in single submit transaction. This only affects signed
@@ -76,13 +78,15 @@ struct EthereumHeadersSource {
client: ethereum_client::Client,
}
type EthereumFutureOutput<T> = OwnedSourceFutureOutput<EthereumHeadersSource, EthereumHeadersSyncPipeline, T>;
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
type Error = ethereum_client::Error;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<u64, Self::Error>)>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderExtraFuture =
Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, Vec<Receipt>), Self::Error>)>>>;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<u64>>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Header>>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Header>>>>;
type HeaderExtraFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<(EthereumHeaderId, Vec<Receipt>)>>>>;
type HeaderCompletionFuture = Ready<EthereumFutureOutput<(EthereumHeaderId, Option<()>)>>;
fn best_block_number(self) -> Self::BestBlockNumberFuture {
ethereum_client::best_block_number(self.client)
@@ -107,6 +111,10 @@ impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
}
fn header_completion(self, id: EthereumHeaderId) -> Self::HeaderCompletionFuture {
ready((self, Ok((id, None))))
}
}
/// Substrate client as Ethereum headers target.
@@ -119,12 +127,16 @@ struct SubstrateHeadersTarget {
sign_params: SubstrateSigningParams,
}
type SubstrateFutureOutput<T> = OwnedTargetFutureOutput<SubstrateHeadersTarget, EthereumHeadersSyncPipeline, T>;
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
type Error = substrate_client::Error;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = (Self, Result<EthereumHeaderId, Self::Error>)>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>;
type RequiresExtraFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = (Self, Result<Vec<EthereumHeaderId>, Self::Error>)>>>;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<EthereumHeaderId>>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<(EthereumHeaderId, bool)>>>>;
type RequiresExtraFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<(EthereumHeaderId, bool)>>>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Vec<EthereumHeaderId>>>>>;
type IncompleteHeadersFuture = Ready<SubstrateFutureOutput<HashSet<EthereumHeaderId>>>;
type CompleteHeadersFuture = Ready<SubstrateFutureOutput<EthereumHeaderId>>;
fn best_header_id(self) -> Self::BestHeaderIdFuture {
let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params);
@@ -192,6 +204,14 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
})
.boxed()
}
fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture {
ready((self, Ok(HashSet::new())))
}
fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> Self::CompleteHeadersFuture {
ready((self, Ok(id)))
}
}
/// Run Ethereum headers synchronization.
@@ -206,13 +226,13 @@ pub fn run(params: EthereumSyncParams) {
crate::sync_loop::run(
EthereumHeadersSource { client: eth_client },
ETHEREUM_TICK_INTERVAL_MS,
ETHEREUM_TICK_INTERVAL,
SubstrateHeadersTarget {
client: sub_client,
sign_transactions: sign_sub_transactions,
sign_params: params.sub_sign,
},
SUBSTRATE_TICK_INTERVAL_MS,
SUBSTRATE_TICK_INTERVAL,
params.sync_params,
);
}
@@ -56,6 +56,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
type Number = u64;
type Header = Header;
type Extra = Vec<Receipt>;
type Completion = ();
fn estimate_size(source: &QueuedHeader<Self>) -> usize {
into_substrate_ethereum_header(source.header()).encode().len()
+543 -90
View File
@@ -15,9 +15,11 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader};
use linked_hash_map::LinkedHashMap;
use num_traits::{One, Zero};
use std::collections::{
btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet,
use std::{
collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
};
type HeadersQueue<P> =
@@ -25,6 +27,9 @@ type HeadersQueue<P> =
type KnownHeaders<P> =
BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HeaderStatus>>;
/// We're trying to fetch completion data for single header at this interval.
const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20);
/// Ethereum headers queue.
#[derive(Debug)]
pub struct QueuedHeaders<P: HeadersSyncPipeline> {
@@ -43,16 +48,34 @@ pub struct QueuedHeaders<P: HeadersSyncPipeline> {
extra: HeadersQueue<P>,
/// Headers that are ready to be submitted to target node.
ready: HeadersQueue<P>,
/// Headers that are ready to be submitted to target node, but their ancestor is incomplete.
/// Thus we're waiting for these ancestors to be completed first.
/// Note that the incomplete header itself is synced and it isn't in this queue.
incomplete: HeadersQueue<P>,
/// Headers that are (we believe) currently submitted to target node by our,
/// not-yet mined transactions.
submitted: HeadersQueue<P>,
/// Pointers to all headers that we ever seen and we believe we can touch in the future.
known_headers: KnownHeaders<P>,
/// Headers that are waiting for completion data from source node. Mapped (and auto-sorted
/// by) to the last fetch time.
incomplete_headers: LinkedHashMap<HeaderId<P::Hash, P::Number>, Option<Instant>>,
/// Headers that are waiting to be completed at target node. Auto-sorted by insertion time.
completion_data: LinkedHashMap<HeaderId<P::Hash, P::Number>, P::Completion>,
/// Pruned blocks border. We do not store or accept any blocks with number less than
/// this number.
prune_border: P::Number,
}
/// Header completion data.
#[derive(Debug)]
struct HeaderCompletion<Completion> {
/// Last time when we tried to upload completion data to target node, if ever.
pub last_upload_time: Option<Instant>,
/// Completion data.
pub completion: Completion,
}
impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// Returns new QueuedHeaders.
pub fn new() -> Self {
@@ -62,8 +85,11 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
maybe_extra: HeadersQueue::new(),
extra: HeadersQueue::new(),
ready: HeadersQueue::new(),
incomplete: HeadersQueue::new(),
submitted: HeadersQueue::new(),
known_headers: KnownHeaders::<P>::new(),
incomplete_headers: LinkedHashMap::new(),
completion_data: LinkedHashMap::new(),
prune_border: Zero::zero(),
}
}
@@ -89,6 +115,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
.fold(0, |total, headers| total + headers.len()),
HeaderStatus::Extra => self.extra.values().fold(0, |total, headers| total + headers.len()),
HeaderStatus::Ready => self.ready.values().fold(0, |total, headers| total + headers.len()),
HeaderStatus::Incomplete => self.incomplete.values().fold(0, |total, headers| total + headers.len()),
HeaderStatus::Submitted => self.submitted.values().fold(0, |total, headers| total + headers.len()),
}
}
@@ -105,6 +132,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
.fold(0, |total, headers| total + headers.len())
+ self.extra.values().fold(0, |total, headers| total + headers.len())
+ self.ready.values().fold(0, |total, headers| total + headers.len())
+ self.incomplete.values().fold(0, |total, headers| total + headers.len())
}
/// Returns number of best block in the queue.
@@ -119,7 +147,10 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.extra.keys().next_back().cloned().unwrap_or_else(Zero::zero),
std::cmp::max(
self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero),
self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero),
std::cmp::max(
self.incomplete.keys().next_back().cloned().unwrap_or_else(Zero::zero),
self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero),
),
),
),
),
@@ -145,6 +176,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
HeaderStatus::MaybeExtra => oldest_header(&self.maybe_extra),
HeaderStatus::Extra => oldest_header(&self.extra),
HeaderStatus::Ready => oldest_header(&self.ready),
HeaderStatus::Incomplete => oldest_header(&self.incomplete),
HeaderStatus::Submitted => oldest_header(&self.submitted),
}
}
@@ -162,6 +194,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
HeaderStatus::MaybeExtra => oldest_headers(&self.maybe_extra, f),
HeaderStatus::Extra => oldest_headers(&self.extra, f),
HeaderStatus::Ready => oldest_headers(&self.ready, f),
HeaderStatus::Incomplete => oldest_headers(&self.incomplete, f),
HeaderStatus::Submitted => oldest_headers(&self.submitted, f),
}
}
@@ -207,6 +240,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
HeaderStatus::MaybeExtra
| HeaderStatus::Extra
| HeaderStatus::Ready
| HeaderStatus::Incomplete
| HeaderStatus::Submitted
| HeaderStatus::Synced => {
insert_header(&mut self.maybe_extra, id, header);
@@ -226,67 +260,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// Receive best header from the target node.
pub fn target_best_header_response(&mut self, id: &HeaderId<P::Hash, P::Number>) {
// all ancestors of this header are now synced => let's remove them from
// queues
let mut current = *id;
let mut id_processed = false;
loop {
let header = match self.status(&current) {
HeaderStatus::Unknown => break,
HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, &current),
HeaderStatus::Orphan => remove_header(&mut self.orphan, &current),
HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, &current),
HeaderStatus::Extra => remove_header(&mut self.extra, &current),
HeaderStatus::Ready => remove_header(&mut self.ready, &current),
HeaderStatus::Submitted => remove_header(&mut self.submitted, &current),
HeaderStatus::Synced => break,
}
.expect("header has a given status; given queue has the header; qed");
log::debug!(
target: "bridge",
"{} header {:?} is now {:?}",
P::SOURCE_NAME,
current,
HeaderStatus::Synced,
);
*self
.known_headers
.entry(current.0)
.or_default()
.entry(current.1)
.or_insert(HeaderStatus::Synced) = HeaderStatus::Synced;
current = header.parent_id();
id_processed = true;
}
// remember that the header is synced
if !id_processed {
// to avoid duplicate log message
log::debug!(
target: "bridge",
"{} header {:?} is now {:?}",
P::SOURCE_NAME,
id,
HeaderStatus::Synced,
);
*self
.known_headers
.entry(id.0)
.or_default()
.entry(id.1)
.or_insert(HeaderStatus::Synced) = HeaderStatus::Synced;
}
// now let's move all descendants from maybe_orphan && orphan queues to
// maybe_extra queue
move_header_descendants::<P>(
&mut [&mut self.maybe_orphan, &mut self.orphan],
&mut self.maybe_extra,
&mut self.known_headers,
HeaderStatus::MaybeExtra,
id,
);
self.header_synced(id)
}
/// Receive target node response for MaybeOrphan request.
@@ -315,6 +289,8 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
pub fn maybe_extra_response(&mut self, id: &HeaderId<P::Hash, P::Number>, response: bool) {
let (destination_status, destination_queue) = if response {
(HeaderStatus::Extra, &mut self.extra)
} else if self.is_parent_incomplete(id) {
(HeaderStatus::Incomplete, &mut self.incomplete)
} else {
(HeaderStatus::Ready, &mut self.ready)
};
@@ -331,17 +307,42 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// Receive extra from source node.
pub fn extra_response(&mut self, id: &HeaderId<P::Hash, P::Number>, extra: P::Extra) {
let (destination_status, destination_queue) = if self.is_parent_incomplete(id) {
(HeaderStatus::Incomplete, &mut self.incomplete)
} else {
(HeaderStatus::Ready, &mut self.ready)
};
// move header itself from extra to ready queue
move_header(
&mut self.extra,
&mut self.ready,
destination_queue,
&mut self.known_headers,
HeaderStatus::Ready,
destination_status,
id,
|header| header.set_extra(extra),
);
}
/// Receive completion response from source node.
pub fn completion_response(&mut self, id: &HeaderId<P::Hash, P::Number>, completion: Option<P::Completion>) {
let completion = match completion {
Some(completion) => completion,
None => return, // we'll try refetch later
};
if self.incomplete_headers.remove(id).is_some() {
log::debug!(
target: "bridge",
"Received completion data from {} for header: {:?}",
P::SOURCE_NAME,
id,
);
self.completion_data.insert(id.clone(), completion);
}
}
/// When header is submitted to target node.
pub fn headers_submitted(&mut self, ids: Vec<HeaderId<P::Hash, P::Number>>) {
for id in ids {
@@ -356,7 +357,110 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
}
}
/// Prune and never accep headers before this block.
/// When header completion data is sent to target node.
pub fn header_completed(&mut self, id: &HeaderId<P::Hash, P::Number>) {
if self.completion_data.remove(id).is_some() {
log::debug!(
target: "bridge",
"Sent completion data to {} for header: {:?}",
P::TARGET_NAME,
id,
);
// transaction can be dropped by target chain nodes => it would never be mined
//
// in current implementation the sync loop would wait for some time && if best
// **source** header won't change on **target** node, then the sync will be restarted
// => we'll resubmit the same completion data again (the same is true for submitted
// headers)
//
// the other option would be to track emitted transactions at least on target node,
// but it won't give us 100% guarantee anyway
//
// => we're just dropping completion data just after it has been submitted
}
}
/// When incomplete headers ids are receved from target node.
pub fn incomplete_headers_response(&mut self, ids: HashSet<HeaderId<P::Hash, P::Number>>) {
// all new incomplete headers are marked Synced and all their descendants
// are moved from Ready/Submitted to Incomplete queue
let new_incomplete_headers = ids
.iter()
.filter(|id| !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id))
.cloned()
.collect::<Vec<_>>();
for new_incomplete_header in new_incomplete_headers {
self.header_synced(&new_incomplete_header);
move_header_descendants::<P>(
&mut [&mut self.ready, &mut self.submitted],
&mut self.incomplete,
&mut self.known_headers,
HeaderStatus::Incomplete,
&new_incomplete_header,
);
log::debug!(
target: "bridge",
"Scheduling completion data retrieval for header: {:?}",
new_incomplete_header,
);
self.incomplete_headers.insert(new_incomplete_header, None);
}
// for all headers that were incompleted previously, but now are completed, we move
// all descendants from incomplete to ready
let just_completed_headers = self
.incomplete_headers
.keys()
.chain(self.completion_data.keys())
.filter(|id| !ids.contains(id))
.cloned()
.collect::<Vec<_>>();
for just_completed_header in just_completed_headers {
move_header_descendants::<P>(
&mut [&mut self.incomplete],
&mut self.ready,
&mut self.known_headers,
HeaderStatus::Ready,
&just_completed_header,
);
log::debug!(
target: "bridge",
"Completion data is no longer required for header: {:?}",
just_completed_header,
);
self.incomplete_headers.remove(&just_completed_header);
self.completion_data.remove(&just_completed_header);
}
}
/// Returns id of the header for which we want to fetch completion data.
pub fn incomplete_header(&mut self) -> Option<HeaderId<P::Hash, P::Number>> {
queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| {
let retry = match *last_fetch_time {
Some(last_fetch_time) => last_fetch_time.elapsed() > RETRY_FETCH_COMPLETION_INTERVAL,
None => true,
};
if retry {
*last_fetch_time = Some(Instant::now());
}
retry
})
.map(|(id, _)| id)
}
/// Returns header completion data to upload to target node.
pub fn header_to_complete(&mut self) -> Option<(HeaderId<P::Hash, P::Number>, &P::Completion)> {
queued_incomplete_header(&mut self.completion_data, |_| true)
}
/// Prune and never accept headers before this block.
pub fn prune(&mut self, prune_border: P::Number) {
if prune_border <= self.prune_border {
return;
@@ -368,6 +472,7 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
prune_queue(&mut self.extra, prune_border);
prune_queue(&mut self.ready, prune_border);
prune_queue(&mut self.submitted, prune_border);
prune_queue(&mut self.incomplete, prune_border);
prune_known_headers::<P>(&mut self.known_headers, prune_border);
self.prune_border = prune_border;
}
@@ -379,10 +484,81 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
self.maybe_extra.clear();
self.extra.clear();
self.ready.clear();
self.incomplete.clear();
self.submitted.clear();
self.known_headers.clear();
self.prune_border = Zero::zero();
}
/// Returns true if parent of this header is either incomplete or waiting for
/// its own incomplete ancestor to be completed.
fn is_parent_incomplete(&self, id: &HeaderId<P::Hash, P::Number>) -> bool {
let status = self.status(id);
let header = match status {
HeaderStatus::MaybeOrphan => header(&self.maybe_orphan, id),
HeaderStatus::Orphan => header(&self.orphan, id),
HeaderStatus::MaybeExtra => header(&self.maybe_extra, id),
HeaderStatus::Extra => header(&self.extra, id),
HeaderStatus::Ready => header(&self.ready, id),
HeaderStatus::Incomplete => header(&self.incomplete, id),
HeaderStatus::Submitted => header(&self.submitted, id),
HeaderStatus::Unknown => return false,
HeaderStatus::Synced => return false,
};
match header {
Some(header) => {
let parent_id = header.header().parent_id();
self.incomplete_headers.contains_key(&parent_id)
|| self.completion_data.contains_key(&parent_id)
|| self.status(&parent_id) == HeaderStatus::Incomplete
}
None => false,
}
}
/// When we receive new Synced header from target node.
fn header_synced(&mut self, id: &HeaderId<P::Hash, P::Number>) {
// all ancestors of this header are now synced => let's remove them from
// queues
let mut current = *id;
let mut id_processed = false;
loop {
let header = match self.status(&current) {
HeaderStatus::Unknown => break,
HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, &current),
HeaderStatus::Orphan => remove_header(&mut self.orphan, &current),
HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, &current),
HeaderStatus::Extra => remove_header(&mut self.extra, &current),
HeaderStatus::Ready => remove_header(&mut self.ready, &current),
HeaderStatus::Incomplete => remove_header(&mut self.incomplete, &current),
HeaderStatus::Submitted => remove_header(&mut self.submitted, &current),
HeaderStatus::Synced => break,
}
.expect("header has a given status; given queue has the header; qed");
set_header_status::<P>(&mut self.known_headers, &current, HeaderStatus::Synced);
current = header.parent_id();
id_processed = true;
}
// remember that the header itself is synced
// (condition is here to avoid duplicate log messages)
if !id_processed {
set_header_status::<P>(&mut self.known_headers, &id, HeaderStatus::Synced);
}
// now let's move all descendants from maybe_orphan && orphan queues to
// maybe_extra queue
move_header_descendants::<P>(
&mut [&mut self.maybe_orphan, &mut self.orphan],
&mut self.maybe_extra,
&mut self.known_headers,
HeaderStatus::MaybeExtra,
id,
);
}
}
/// Insert header to the queue.
@@ -411,6 +587,14 @@ fn remove_header<P: HeadersSyncPipeline>(
header
}
/// Get header from the queue.
fn header<'a, P: HeadersSyncPipeline>(
queue: &'a HeadersQueue<P>,
id: &HeaderId<P::Hash, P::Number>,
) -> Option<&'a QueuedHeader<P>> {
queue.get(&id.0).and_then(|by_hash| by_hash.get(&id.1))
}
/// Move header from source to destination queue.
///
/// Returns ID of parent header, if header has been moved, or None otherwise.
@@ -428,16 +612,8 @@ fn move_header<P: HeadersSyncPipeline>(
};
let parent_id = header.header().parent_id();
known_headers.entry(id.0).or_default().insert(id.1, destination_status);
destination_queue.entry(id.0).or_default().insert(id.1, header);
log::debug!(
target: "bridge",
"{} header {:?} is now {:?}",
P::SOURCE_NAME,
id,
destination_status,
);
set_header_status::<P>(known_headers, id, destination_status);
Some(parent_id)
}
@@ -473,19 +649,8 @@ fn move_header_descendants<P: HeadersSyncPipeline>(
if current_parents.contains(&entry.get().header().parent_id().1) {
let header_to_move = entry.remove();
let header_to_move_id = header_to_move.id();
known_headers
.entry(header_to_move_id.0)
.or_default()
.insert(header_to_move_id.1, destination_status);
headers_to_move.push((header_to_move_id, header_to_move));
log::debug!(
target: "bridge",
"{} header {:?} is now {:?}",
P::SOURCE_NAME,
header_to_move_id,
destination_status,
);
set_header_status::<P>(known_headers, &header_to_move_id, destination_status);
}
}
@@ -544,6 +709,44 @@ fn prune_known_headers<P: HeadersSyncPipeline>(known_headers: &mut KnownHeaders<
*known_headers = new_known_headers;
}
/// Change header status.
fn set_header_status<P: HeadersSyncPipeline>(
known_headers: &mut KnownHeaders<P>,
id: &HeaderId<P::Hash, P::Number>,
status: HeaderStatus,
) {
log::debug!(
target: "bridge",
"{} header {:?} is now {:?}",
P::SOURCE_NAME,
id,
status,
);
*known_headers.entry(id.0).or_default().entry(id.1).or_insert(status) = status;
}
/// Returns queued incomplete header with maximal elapsed time since last update.
fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
map: &mut LinkedHashMap<Id, T>,
filter: impl FnMut(&mut T) -> bool,
) -> Option<(Id, &T)> {
// TODO (#84): headers that have been just appended to the end of the queue would have to wait until
// all previous headers will be retried
let retry_old_header = map
.front()
.map(|(key, _)| key.clone())
.and_then(|key| map.get_mut(&key).map(filter))
.unwrap_or(false);
if retry_old_header {
let (header_key, header) = map.pop_front().expect("we have checked that front() exists; qed");
map.insert(header_key, header);
return map.back().map(|(id, data)| (id.clone(), data));
}
None
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
@@ -595,7 +798,11 @@ pub(crate) mod tests {
hash(6),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.total_headers(), 6);
queue.incomplete.entry(6).or_default().insert(
hash(7),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.total_headers(), 7);
}
#[test]
@@ -639,6 +846,12 @@ pub(crate) mod tests {
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 40);
// and then there's some header in Incomplete
queue.incomplete.entry(50).or_default().insert(
hash(50),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 50);
}
#[test]
@@ -946,6 +1159,7 @@ pub(crate) mod tests {
#[test]
fn negative_maybe_extra_response_works() {
// when parent header is complete
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue
.known_headers
@@ -957,10 +1171,24 @@ pub(crate) mod tests {
assert!(queue.maybe_extra.is_empty());
assert_eq!(queue.ready.len(), 1);
assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready);
// when parent header is incomplete
queue.incomplete_headers.insert(id(200), None);
queue
.known_headers
.entry(201)
.or_default()
.insert(hash(201), HeaderStatus::MaybeExtra);
queue.maybe_extra.entry(201).or_default().insert(hash(201), header(201));
queue.maybe_extra_response(&id(201), false);
assert!(queue.maybe_extra.is_empty());
assert_eq!(queue.incomplete.len(), 1);
assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete);
}
#[test]
fn receipts_response_works() {
// when parent header is complete
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue
.known_headers
@@ -972,6 +1200,19 @@ pub(crate) mod tests {
assert!(queue.extra.is_empty());
assert_eq!(queue.ready.len(), 1);
assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready);
// when parent header is incomplete
queue.incomplete_headers.insert(id(200), None);
queue
.known_headers
.entry(201)
.or_default()
.insert(hash(201), HeaderStatus::Extra);
queue.extra.entry(201).or_default().insert(hash(201), header(201));
queue.extra_response(&id(201), Vec::new());
assert!(queue.extra.is_empty());
assert_eq!(queue.incomplete.len(), 1);
assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete);
}
#[test]
@@ -988,9 +1229,172 @@ pub(crate) mod tests {
assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Submitted);
}
#[test]
fn incomplete_header_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
// nothing to complete if queue is empty
assert_eq!(queue.incomplete_header(), None);
// when there's new header to complete => ask for completion data
queue.incomplete_headers.insert(id(100), None);
assert_eq!(queue.incomplete_header(), Some(id(100)));
// we have just asked for completion data => nothing to request
assert_eq!(queue.incomplete_header(), None);
// enough time have passed => ask again
queue.incomplete_headers.clear();
queue.incomplete_headers.insert(
id(100),
Some(Instant::now() - RETRY_FETCH_COMPLETION_INTERVAL - RETRY_FETCH_COMPLETION_INTERVAL),
);
assert_eq!(queue.incomplete_header(), Some(id(100)));
}
#[test]
fn completion_response_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue.incomplete_headers.insert(id(100), None);
queue.incomplete_headers.insert(id(200), Some(Instant::now()));
// when headers isn't incompete, nothing changes
queue.completion_response(&id(300), None);
assert_eq!(queue.incomplete_headers.len(), 2);
assert_eq!(queue.completion_data.len(), 0);
assert_eq!(queue.header_to_complete(), None);
// when response is None, nothing changes
queue.completion_response(&id(100), None);
assert_eq!(queue.incomplete_headers.len(), 2);
assert_eq!(queue.completion_data.len(), 0);
assert_eq!(queue.header_to_complete(), None);
// when response is Some, we're scheduling completion
queue.completion_response(&id(200), Some(()));
assert_eq!(queue.incomplete_headers.len(), 1);
assert_eq!(queue.completion_data.len(), 1);
assert!(queue.incomplete_headers.contains_key(&id(100)));
assert!(queue.completion_data.contains_key(&id(200)));
assert_eq!(queue.header_to_complete(), Some((id(200), &())));
}
#[test]
fn header_completed_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue.completion_data.insert(id(100), ());
// when unknown header is completed
queue.header_completed(&id(200));
assert_eq!(queue.completion_data.len(), 1);
// when known header is completed
queue.header_completed(&id(100));
assert_eq!(queue.completion_data.len(), 0);
}
#[test]
fn incomplete_headers_response_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
// when we have already submitted #101 and #102 is ready
queue
.known_headers
.entry(101)
.or_default()
.insert(hash(101), HeaderStatus::Submitted);
queue.submitted.entry(101).or_default().insert(hash(101), header(101));
queue
.known_headers
.entry(102)
.or_default()
.insert(hash(102), HeaderStatus::Ready);
queue.submitted.entry(102).or_default().insert(hash(102), header(102));
// AND now we know that the #100 is incomplete
queue.incomplete_headers_response(vec![id(100)].into_iter().collect());
// => #101 and #102 are moved to the Incomplete and #100 is now synced
assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(101)), HeaderStatus::Incomplete);
assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete);
assert_eq!(queue.submitted.len(), 0);
assert_eq!(queue.ready.len(), 0);
assert!(queue.incomplete.entry(101).or_default().contains_key(&hash(101)));
assert!(queue.incomplete.entry(102).or_default().contains_key(&hash(102)));
assert!(queue.incomplete_headers.contains_key(&id(100)));
assert!(queue.completion_data.is_empty());
// and then header #100 is no longer incomplete
queue.incomplete_headers_response(vec![].into_iter().collect());
// => #101 and #102 are moved to the Ready queue and #100 if now forgotten
assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(101)), HeaderStatus::Ready);
assert_eq!(queue.status(&id(102)), HeaderStatus::Ready);
assert_eq!(queue.incomplete.len(), 0);
assert_eq!(queue.submitted.len(), 0);
assert!(queue.ready.entry(101).or_default().contains_key(&hash(101)));
assert!(queue.ready.entry(102).or_default().contains_key(&hash(102)));
assert!(queue.incomplete_headers.is_empty());
assert!(queue.completion_data.is_empty());
}
#[test]
fn is_parent_incomplete_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
// when we do not know header itself
assert_eq!(queue.is_parent_incomplete(&id(50)), false);
// when we do not know parent
queue
.known_headers
.entry(100)
.or_default()
.insert(hash(100), HeaderStatus::Incomplete);
queue.incomplete.entry(100).or_default().insert(hash(100), header(100));
assert_eq!(queue.is_parent_incomplete(&id(100)), false);
// when parent is inside incomplete queue (i.e. some other ancestor is actually incomplete)
queue
.known_headers
.entry(101)
.or_default()
.insert(hash(101), HeaderStatus::Submitted);
queue.submitted.entry(101).or_default().insert(hash(101), header(101));
assert_eq!(queue.is_parent_incomplete(&id(101)), true);
// when parent is the incomplete header and we do not have completion data
queue.incomplete_headers.insert(id(199), None);
queue
.known_headers
.entry(200)
.or_default()
.insert(hash(200), HeaderStatus::Submitted);
queue.submitted.entry(200).or_default().insert(hash(200), header(200));
assert_eq!(queue.is_parent_incomplete(&id(200)), true);
// when parent is the incomplete header and we have completion data
queue.completion_data.insert(id(299), ());
queue
.known_headers
.entry(300)
.or_default()
.insert(hash(300), HeaderStatus::Submitted);
queue.submitted.entry(300).or_default().insert(hash(300), header(300));
assert_eq!(queue.is_parent_incomplete(&id(300)), true);
}
#[test]
fn prune_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue
.known_headers
.entry(105)
.or_default()
.insert(hash(105), HeaderStatus::Incomplete);
queue.incomplete.entry(105).or_default().insert(hash(105), header(105));
queue
.known_headers
.entry(104)
@@ -1033,7 +1437,8 @@ pub(crate) mod tests {
assert_eq!(queue.maybe_extra.len(), 1);
assert_eq!(queue.orphan.len(), 1);
assert_eq!(queue.maybe_orphan.len(), 1);
assert_eq!(queue.known_headers.len(), 3);
assert_eq!(queue.incomplete.len(), 1);
assert_eq!(queue.known_headers.len(), 4);
queue.prune(110);
@@ -1042,6 +1447,7 @@ pub(crate) mod tests {
assert_eq!(queue.maybe_extra.len(), 0);
assert_eq!(queue.orphan.len(), 0);
assert_eq!(queue.maybe_orphan.len(), 0);
assert_eq!(queue.incomplete.len(), 0);
assert_eq!(queue.known_headers.len(), 0);
queue.header_response(header(109).header().clone());
@@ -1050,4 +1456,51 @@ pub(crate) mod tests {
queue.header_response(header(110).header().clone());
assert_eq!(queue.known_headers.len(), 1);
}
#[test]
fn incomplete_headers_are_still_incomplete_after_advance() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
// relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete queue
queue.incomplete_headers.insert(id(100), None);
queue.incomplete.entry(101).or_default().insert(hash(101), header(101));
queue.incomplete.entry(102).or_default().insert(hash(102), header(102));
queue.incomplete.entry(103).or_default().insert(hash(103), header(103));
queue.incomplete.entry(104).or_default().insert(hash(104), header(104));
queue
.known_headers
.entry(100)
.or_default()
.insert(hash(100), HeaderStatus::Synced);
queue
.known_headers
.entry(101)
.or_default()
.insert(hash(101), HeaderStatus::Incomplete);
queue
.known_headers
.entry(102)
.or_default()
.insert(hash(102), HeaderStatus::Incomplete);
queue
.known_headers
.entry(103)
.or_default()
.insert(hash(103), HeaderStatus::Incomplete);
queue
.known_headers
.entry(104)
.or_default()
.insert(hash(104), HeaderStatus::Incomplete);
// let's say relay#2 completes header#100 and then submits header#101+header#102 and it turns
// out that header#102 is also incomplete
queue.incomplete_headers_response(vec![id(102)].into_iter().collect());
// then the header#103 and the header#104 must have Incomplete status
assert_eq!(queue.status(&id(101)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(102)), HeaderStatus::Synced);
assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete);
assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete);
}
}
@@ -16,7 +16,8 @@
use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256};
use crate::substrate_types::{
into_substrate_ethereum_header, into_substrate_ethereum_receipts, Hash, Header as SubstrateHeader, Number,
into_substrate_ethereum_header, into_substrate_ethereum_receipts, GrandpaJustification, Hash,
Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock, SubstrateHeaderId,
};
use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader};
use crate::{bail_on_arg_error, bail_on_error};
@@ -274,6 +275,19 @@ pub async fn submit_unsigned_ethereum_headers(
(client, Ok(ids))
}
/// Get GRANDPA justification for given block.
pub async fn grandpa_justification(
client: Client,
id: SubstrateHeaderId,
) -> (Client, Result<(SubstrateHeaderId, Option<GrandpaJustification>), Error>) {
let hash = bail_on_arg_error!(to_value(id.1).map_err(|e| Error::RequestSerialization(e)), client);
let (client, signed_block) = call_rpc(client, "chain_getBlock", Params::Array(vec![hash]), rpc_returns_value).await;
(
client,
signed_block.map(|signed_block: SignedSubstrateBlock| (id, signed_block.justification)),
)
}
/// Get GRANDPA authorities set at given block.
pub async fn grandpa_authorities_set(client: Client, block: Hash) -> (Client, Result<Vec<u8>, Error>) {
let block = bail_on_arg_error!(to_value(block).map_err(|e| Error::RequestSerialization(e)), client);
@@ -14,22 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate -> Ethereum synchronization.
use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams};
use crate::ethereum_types::Address;
use crate::substrate_client::{self, SubstrateConnectionParams};
use crate::substrate_types::{
Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline,
GrandpaJustification, Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline,
};
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient};
use crate::sync_types::SourceHeader;
use futures::future::{ready, FutureExt, Ready};
use std::{future::Future, pin::Pin};
use std::{collections::HashSet, future::Future, pin::Pin, time::Duration};
/// Interval (in ms) at which we check new Substrate headers when we are synced/almost synced.
const SUBSTRATE_TICK_INTERVAL_MS: u64 = 10_000;
/// Interval (in ms) at which we check new Ethereum blocks.
const ETHEREUM_TICK_INTERVAL_MS: u64 = 5_000;
/// Interval at which we check new Substrate headers when we are synced/almost synced.
const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(10);
/// Interval at which we check new Ethereum blocks.
const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(5);
/// Max Ethereum headers we want to have in all 'before-submitted' states.
const MAX_FUTURE_HEADERS_TO_DOWNLOAD: usize = 8;
/// Max Ethereum headers count we want to have in 'submitted' state.
@@ -83,12 +85,16 @@ struct SubstrateHeadersSource {
client: substrate_client::Client,
}
type SubstrateFutureOutput<T> = OwnedSourceFutureOutput<SubstrateHeadersSource, SubstrateHeadersSyncPipeline, T>;
impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
type Error = substrate_client::Error;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<Number, Self::Error>)>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderExtraFuture = Ready<(Self, Result<(SubstrateHeaderId, ()), Self::Error>)>;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Number>>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Header>>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = SubstrateFutureOutput<Header>>>>;
type HeaderExtraFuture = Ready<SubstrateFutureOutput<(SubstrateHeaderId, ())>>;
type HeaderCompletionFuture =
Pin<Box<dyn Future<Output = SubstrateFutureOutput<(SubstrateHeaderId, Option<GrandpaJustification>)>>>>;
fn best_block_number(self) -> Self::BestBlockNumberFuture {
substrate_client::best_header(self.client)
@@ -111,6 +117,12 @@ impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
fn header_extra(self, id: SubstrateHeaderId, _header: &Header) -> Self::HeaderExtraFuture {
ready((self, Ok((id, ()))))
}
fn header_completion(self, id: SubstrateHeaderId) -> Self::HeaderCompletionFuture {
substrate_client::grandpa_justification(self.client, id)
.map(|(client, result)| (SubstrateHeadersSource { client }, result))
.boxed()
}
}
/// Ethereum client as Substrate headers target.
@@ -123,12 +135,16 @@ struct EthereumHeadersTarget {
sign_params: EthereumSigningParams,
}
type EthereumFutureOutput<T> = OwnedTargetFutureOutput<EthereumHeadersTarget, SubstrateHeadersSyncPipeline, T>;
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
type Error = ethereum_client::Error;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = (Self, Result<SubstrateHeaderId, Self::Error>)>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = (Self, Result<(SubstrateHeaderId, bool), Self::Error>)>>>;
type RequiresExtraFuture = Ready<(Self, Result<(SubstrateHeaderId, bool), Self::Error>)>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = (Self, Result<Vec<SubstrateHeaderId>, Self::Error>)>>>;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<SubstrateHeaderId>>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<(SubstrateHeaderId, bool)>>>>;
type RequiresExtraFuture = Ready<EthereumFutureOutput<(SubstrateHeaderId, bool)>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<Vec<SubstrateHeaderId>>>>>;
type IncompleteHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<HashSet<SubstrateHeaderId>>>>>;
type CompleteHeadersFuture = Pin<Box<dyn Future<Output = EthereumFutureOutput<SubstrateHeaderId>>>>;
fn best_header_id(self) -> Self::BestHeaderIdFuture {
let (contract, sign_params) = (self.contract, self.sign_params);
@@ -181,6 +197,38 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
})
.boxed()
}
fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::incomplete_substrate_headers(self.client, contract)
.map(move |(client, result)| {
(
EthereumHeadersTarget {
client,
contract,
sign_params,
},
result,
)
})
.boxed()
}
fn complete_header(self, id: SubstrateHeaderId, completion: GrandpaJustification) -> Self::CompleteHeadersFuture {
let (contract, sign_params) = (self.contract, self.sign_params);
ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion)
.map(move |(client, result)| {
(
EthereumHeadersTarget {
client,
contract,
sign_params,
},
result,
)
})
.boxed()
}
}
/// Run Substrate headers synchronization.
@@ -190,13 +238,13 @@ pub fn run(params: SubstrateSyncParams) {
crate::sync_loop::run(
SubstrateHeadersSource { client: sub_client },
SUBSTRATE_TICK_INTERVAL_MS,
SUBSTRATE_TICK_INTERVAL,
EthereumHeadersTarget {
client: eth_client,
contract: params.eth_contract_address,
sign_params: params.eth_sign,
},
ETHEREUM_TICK_INTERVAL_MS,
ETHEREUM_TICK_INTERVAL,
params.sync_params,
);
}
@@ -34,6 +34,12 @@ pub type Number = bridge_node_runtime::BlockNumber;
/// Substrate header type.
pub type Header = bridge_node_runtime::Header;
/// Substrate signed block type.
pub type SignedBlock = bridge_node_runtime::SignedBlock;
/// GRANDPA justification.
pub type GrandpaJustification = Vec<u8>;
/// Substrate header ID.
pub type SubstrateHeaderId = HeaderId<bridge_node_runtime::Hash, bridge_node_runtime::BlockNumber>;
@@ -53,6 +59,7 @@ impl HeadersSyncPipeline for SubstrateHeadersSyncPipeline {
type Number = bridge_node_runtime::BlockNumber;
type Header = Header;
type Extra = ();
type Completion = GrandpaJustification;
fn estimate_size(source: &QueuedHeader<Self>) -> usize {
source.header().encode().len()
+143 -51
View File
@@ -18,10 +18,14 @@ use crate::sync::HeadersSyncParams;
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader};
use futures::{future::FutureExt, stream::StreamExt};
use num_traits::{Saturating, Zero};
use std::future::Future;
use std::{
collections::HashSet,
future::Future,
time::{Duration, Instant},
};
/// When we submit headers to target node, but see no updates of best
/// source block known to target node during STALL_SYNC_TIMEOUT_MS milliseconds,
/// source block known to target node during STALL_SYNC_TIMEOUT seconds,
/// we consider that our headers are rejected because there has been reorg in target chain.
/// This reorg could invalidate our knowledge about sync process (i.e. we have asked if
/// HeaderA is known to target, but then reorg happened and the answer is different
@@ -30,26 +34,35 @@ use std::future::Future;
/// direct child of previous best header. But: (1) subscription doesn't guarantee that
/// the subscriber will receive every best header (2) reorg won't always lead to sync
/// stall and restart is a heavy operation (we forget all in-memory headers).
const STALL_SYNC_TIMEOUT_MS: u64 = 30_000;
/// Delay (in milliseconds) after we have seen update of best source header at target node,
const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// Delay after we have seen update of best source header at target node,
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000;
/// Delay (in milliseconds) after connection-related error happened before we'll try
const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Delay after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY_MS: u64 = 10_000;
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
/// Type alias for all SourceClient futures.
pub type OwnedSourceFutureOutput<Client, P, T> = (Client, Result<T, <Client as SourceClient<P>>::Error>);
/// Type alias for all TargetClient futures.
pub type OwnedTargetFutureOutput<Client, P, T> = (Client, Result<T, <Client as TargetClient<P>>::Error>);
/// Source client trait.
pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Future that returns best block number.
type BestBlockNumberFuture: Future<Output = (Self, Result<P::Number, Self::Error>)>;
type BestBlockNumberFuture: Future<Output = OwnedSourceFutureOutput<Self, P, P::Number>>;
/// Future that returns header by hash.
type HeaderByHashFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>;
type HeaderByHashFuture: Future<Output = OwnedSourceFutureOutput<Self, P, P::Header>>;
/// Future that returns header by number.
type HeaderByNumberFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>;
type HeaderByNumberFuture: Future<Output = OwnedSourceFutureOutput<Self, P, P::Header>>;
/// Future that returns extra data associated with header.
type HeaderExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, P::Extra), Self::Error>)>;
type HeaderExtraFuture: Future<Output = OwnedSourceFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, P::Extra)>>;
/// Future that returns data required to 'complete' header.
type HeaderCompletionFuture: Future<
Output = OwnedSourceFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, Option<P::Completion>)>,
>;
/// Get best block number.
fn best_block_number(self) -> Self::BestBlockNumberFuture;
@@ -59,6 +72,8 @@ pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
fn header_by_number(self, number: P::Number) -> Self::HeaderByNumberFuture;
/// Get extra data by header hash.
fn header_extra(self, id: HeaderId<P::Hash, P::Number>, header: &P::Header) -> Self::HeaderExtraFuture;
/// Get completion data by header hash.
fn header_completion(self, id: HeaderId<P::Hash, P::Number>) -> Self::HeaderCompletionFuture;
}
/// Target client trait.
@@ -66,13 +81,19 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Future that returns best header id.
type BestHeaderIdFuture: Future<Output = (Self, Result<HeaderId<P::Hash, P::Number>, Self::Error>)>;
type BestHeaderIdFuture: Future<Output = OwnedTargetFutureOutput<Self, P, HeaderId<P::Hash, P::Number>>>;
/// Future that returns known header check result.
type IsKnownHeaderFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>;
type IsKnownHeaderFuture: Future<Output = OwnedTargetFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, bool)>>;
/// Future that returns extra check result.
type RequiresExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>;
type RequiresExtraFuture: Future<Output = OwnedTargetFutureOutput<Self, P, (HeaderId<P::Hash, P::Number>, bool)>>;
/// Future that returns header submission result.
type SubmitHeadersFuture: Future<Output = (Self, Result<Vec<HeaderId<P::Hash, P::Number>>, Self::Error>)>;
type SubmitHeadersFuture: Future<Output = OwnedTargetFutureOutput<Self, P, Vec<HeaderId<P::Hash, P::Number>>>>;
/// Future that returns incomplete headers ids.
type IncompleteHeadersFuture: Future<
Output = OwnedTargetFutureOutput<Self, P, HashSet<HeaderId<P::Hash, P::Number>>>,
>;
/// Future that returns header completion result.
type CompleteHeadersFuture: Future<Output = OwnedTargetFutureOutput<Self, P, HeaderId<P::Hash, P::Number>>>;
/// Returns ID of best header known to the target node.
fn best_header_id(self) -> Self::BestHeaderIdFuture;
@@ -82,23 +103,31 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
fn requires_extra(self, header: &QueuedHeader<P>) -> Self::RequiresExtraFuture;
/// Submit headers.
fn submit_headers(self, headers: Vec<QueuedHeader<P>>) -> Self::SubmitHeadersFuture;
/// Returns ID of headers that require to be 'completed' before children can be submitted.
fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture;
/// Submit completion data for header.
fn complete_header(
self,
id: HeaderId<P::Hash, P::Number>,
completion: P::Completion,
) -> Self::CompleteHeadersFuture;
}
/// Run headers synchronization.
pub fn run<P: HeadersSyncPipeline>(
source_client: impl SourceClient<P>,
source_tick_ms: u64,
source_tick: Duration,
target_client: impl TargetClient<P>,
target_tick_ms: u64,
target_tick: Duration,
sync_params: HeadersSyncParams,
) {
let mut local_pool = futures::executor::LocalPool::new();
let mut progress_context = (std::time::Instant::now(), None, None);
let mut progress_context = (Instant::now(), None, None);
local_pool.run_until(async move {
let mut sync = crate::sync::HeadersSync::<P>::new(sync_params);
let mut stall_countdown = None;
let mut last_update_time = std::time::Instant::now();
let mut last_update_time = Instant::now();
let mut source_maybe_client = None;
let mut source_best_block_number_required = false;
@@ -106,29 +135,36 @@ pub fn run<P: HeadersSyncPipeline>(
let source_new_header_future = futures::future::Fuse::terminated();
let source_orphan_header_future = futures::future::Fuse::terminated();
let source_extra_future = futures::future::Fuse::terminated();
let source_completion_future = futures::future::Fuse::terminated();
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick_ms).fuse();
let source_tick_stream = interval(source_tick).fuse();
let mut target_maybe_client = None;
let mut target_best_block_required = false;
let mut target_incomplete_headers_required = true;
let target_best_block_future = target_client.best_header_id().fuse();
let target_incomplete_headers_future = futures::future::Fuse::terminated();
let target_extra_check_future = futures::future::Fuse::terminated();
let target_existence_status_future = futures::future::Fuse::terminated();
let target_submit_header_future = futures::future::Fuse::terminated();
let target_complete_header_future = futures::future::Fuse::terminated();
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick_ms).fuse();
let target_tick_stream = interval(target_tick).fuse();
futures::pin_mut!(
source_best_block_number_future,
source_new_header_future,
source_orphan_header_future,
source_extra_future,
source_completion_future,
source_go_offline_future,
source_tick_stream,
target_best_block_future,
target_incomplete_headers_future,
target_extra_check_future,
target_existence_status_future,
target_submit_header_future,
target_complete_header_future,
target_go_offline_future,
target_tick_stream
);
@@ -144,7 +180,7 @@ pub fn run<P: HeadersSyncPipeline>(
source_best_block_number,
|source_best_block_number| sync.source_best_header_number_response(source_best_block_number),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
);
},
@@ -155,7 +191,7 @@ pub fn run<P: HeadersSyncPipeline>(
source_new_header,
|source_new_header| sync.headers_mut().header_response(source_new_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
);
},
@@ -166,7 +202,7 @@ pub fn run<P: HeadersSyncPipeline>(
source_orphan_header,
|source_orphan_header| sync.headers_mut().header_response(source_orphan_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
);
},
@@ -177,10 +213,21 @@ pub fn run<P: HeadersSyncPipeline>(
source_extra,
|(header, extra)| sync.headers_mut().extra_response(&header, extra),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
);
},
(source_client, source_completion) = source_completion_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_completion,
|(header, completion)| sync.headers_mut().completion_response(&header, completion),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY, source_client),
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
);
},
source_client = source_go_offline_future => {
source_maybe_client = Some(source_client);
},
@@ -199,21 +246,20 @@ pub fn run<P: HeadersSyncPipeline>(
|target_best_block| {
let head_updated = sync.target_best_header_response(target_best_block);
if head_updated {
last_update_time = std::time::Instant::now();
last_update_time = Instant::now();
}
match head_updated {
// IF head is updated AND there are still our transactions:
// => restart stall countdown timer
true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 =>
stall_countdown = Some(std::time::Instant::now()),
stall_countdown = Some(Instant::now()),
// IF head is updated AND there are no our transactions:
// => stop stall countdown timer
true => stall_countdown = None,
// IF head is not updated AND stall countdown is not yet completed
// => do nothing
false if stall_countdown
.map(|stall_countdown| std::time::Instant::now() - stall_countdown <
std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS))
.map(|stall_countdown| stall_countdown.elapsed() < STALL_SYNC_TIMEOUT)
.unwrap_or(true)
=> (),
// IF head is not updated AND stall countdown has completed
@@ -231,10 +277,23 @@ pub fn run<P: HeadersSyncPipeline>(
}
},
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
);
},
(target_client, incomplete_headers_ids) = target_incomplete_headers_future => {
target_incomplete_headers_required = false;
process_future_result(
&mut target_maybe_client,
target_client,
incomplete_headers_ids,
|incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
);
},
(target_client, target_existence_status) = target_existence_status_future => {
process_future_result(
&mut target_maybe_client,
@@ -244,7 +303,7 @@ pub fn run<P: HeadersSyncPipeline>(
.headers_mut()
.maybe_orphan_response(&target_header, target_existence_status),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
@@ -255,10 +314,21 @@ pub fn run<P: HeadersSyncPipeline>(
target_submit_header_result,
|submitted_headers| sync.headers_mut().headers_submitted(submitted_headers),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);
},
(target_client, target_complete_header_result) = target_complete_header_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_complete_header_result,
|completed_header| sync.headers_mut().header_completed(&completed_header),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error completing headers at {}", P::TARGET_NAME),
);
},
(target_client, target_extra_check_result) = target_extra_check_future => {
process_future_result(
&mut target_maybe_client,
@@ -268,7 +338,7 @@ pub fn run<P: HeadersSyncPipeline>(
.headers_mut()
.maybe_extra_response(&header, extra_check_result),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|target_client| delay(CONNECTION_ERROR_DELAY, target_client),
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
);
},
@@ -277,6 +347,7 @@ pub fn run<P: HeadersSyncPipeline>(
},
_ = target_tick_stream.next() => {
target_best_block_required = true;
target_incomplete_headers_required = true;
},
}
@@ -287,13 +358,26 @@ pub fn run<P: HeadersSyncPipeline>(
if let Some(target_client) = target_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely;
// 2) check if we need extra data from source - it stops us from downloading/submitting new blocks;
// 3) check existence - it stops us from submitting new blocks;
// 4) submit header
// 2) get incomplete headers - it stops us from submitting new blocks + we call it rarely;
// 3) complete headers - it stops us from submitting new blocks;
// 4) check if we need extra data from source - it stops us from downloading/submitting new blocks;
// 5) check existence - it stops us from submitting new blocks;
// 6) submit header
if target_best_block_required {
log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME);
target_best_block_future.set(target_client.best_header_id().fuse());
} else if target_incomplete_headers_required {
log::debug!(target: "bridge", "Asking {} about incomplete headers", P::TARGET_NAME);
target_incomplete_headers_future.set(target_client.incomplete_headers_ids().fuse());
} else if let Some((id, completion)) = sync.headers_mut().header_to_complete() {
log::debug!(
target: "bridge",
"Going to complete header: {:?}",
id,
);
target_complete_header_future.set(target_client.complete_header(id, completion.clone()).fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) {
log::debug!(
target: "bridge",
@@ -314,9 +398,9 @@ pub fn run<P: HeadersSyncPipeline>(
);
target_existence_status_future.set(target_client.is_known_header(parent_id).fuse());
} else if let Some(headers) = sync.select_headers_to_submit(
last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS),
) {
} else if let Some(headers) =
sync.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT)
{
let ids = match headers.len() {
1 => format!("{:?}", headers[0].id()),
2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()),
@@ -335,7 +419,7 @@ pub fn run<P: HeadersSyncPipeline>(
// remember that we have submitted some headers
if stall_countdown.is_none() {
stall_countdown = Some(std::time::Instant::now());
stall_countdown = Some(Instant::now());
}
} else {
target_maybe_client = Some(target_client);
@@ -346,13 +430,21 @@ pub fn run<P: HeadersSyncPipeline>(
if let Some(source_client) = source_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading new blocks + we call it rarely;
// 2) download extra data - it stops us from submitting new blocks;
// 3) download missing headers - it stops us from downloading/submitting new blocks;
// 4) downloading new headers
// 2) download completion data - it stops us from submitting new blocks;
// 3) download extra data - it stops us from submitting new blocks;
// 4) download missing headers - it stops us from downloading/submitting new blocks;
// 5) downloading new headers
if source_best_block_number_required {
log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME);
source_best_block_number_future.set(source_client.best_block_number().fuse());
} else if let Some(id) = sync.headers_mut().incomplete_header() {
log::debug!(
target: "bridge",
"Retrieving completion data for header: {:?}",
id,
);
source_completion_future.set(source_client.header_completion(id).fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::Extra) {
let id = header.id();
log::debug!(
@@ -402,15 +494,15 @@ pub fn run<P: HeadersSyncPipeline>(
}
/// Future that resolves into given value after given timeout.
async fn delay<T>(timeout_ms: u64, retval: T) -> T {
async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await;
async fn delay<T>(timeout: Duration, retval: T) -> T {
async_std::task::sleep(timeout).await;
retval
}
/// Stream that emits item every `timeout_ms` milliseconds.
fn interval(timeout_ms: u64) -> impl futures::Stream<Item = ()> {
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
delay(timeout_ms, ()).await;
delay(timeout, ()).await;
Some(((), ()))
})
}
@@ -447,14 +539,14 @@ fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>(
/// Print synchronization progress.
fn print_sync_progress<P: HeadersSyncPipeline>(
progress_context: (std::time::Instant, Option<P::Number>, Option<P::Number>),
progress_context: (Instant, Option<P::Number>, Option<P::Number>),
eth_sync: &crate::sync::HeadersSync<P>,
) -> (std::time::Instant, Option<P::Number>, Option<P::Number>) {
) -> (Instant, Option<P::Number>, Option<P::Number>) {
let (prev_time, prev_best_header, prev_target_header) = progress_context;
let now_time = std::time::Instant::now();
let now_time = Instant::now();
let (now_best_header, now_target_header) = eth_sync.status();
let need_update = now_time - prev_time > std::time::Duration::from_secs(10)
let need_update = now_time - prev_time > Duration::from_secs(10)
|| match (prev_best_header, now_best_header) {
(Some(prev_best_header), Some(now_best_header)) => {
now_best_header.0.saturating_sub(prev_best_header) > 10.into()
+20 -3
View File
@@ -15,7 +15,7 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
/// Ethereum header Id.
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);
/// Ethereum header synchronization status.
@@ -33,6 +33,8 @@ pub enum HeaderStatus {
Extra,
/// Header is in Ready queue.
Ready,
/// Header is in Incomplete queue.
Incomplete,
/// Header has been recently submitted to the target node.
Submitted,
/// Header is known to the target node.
@@ -61,6 +63,7 @@ pub trait HeadersSyncPipeline: Clone + Copy {
+ Copy
+ std::fmt::Debug
+ std::fmt::Display
+ std::hash::Hash
+ std::ops::Add<Output = Self::Number>
+ std::ops::Sub<Output = Self::Number>
+ num_traits::Saturating
@@ -68,10 +71,24 @@ pub trait HeadersSyncPipeline: Clone + Copy {
+ num_traits::One;
/// Type of header that we're syncing.
type Header: Clone + std::fmt::Debug + SourceHeader<Self::Hash, Self::Number>;
/// Type of extra data for the header that we're receiving from the source node.
/// Type of extra data for the header that we're receiving from the source node:
/// 1) extra data is required for some headers;
/// 2) target node may answer if it'll require extra data before header is submitted;
/// 3) extra data available since the header creation time;
/// 4) header and extra data are submitted in single transaction.
///
/// Example: Ethereum transactions receipts.
type Extra: Clone + std::fmt::Debug;
/// Type of data required to 'complete' header that we're receiving from the source node:
/// 1) completion data is required for some headers;
/// 2) target node can't answer if it'll require completion data before header is accepted;
/// 3) completion data may be generated after header generation;
/// 4) header and completion data are submitted in separate transactions.
///
/// Example: Substrate GRANDPA justifications.
type Completion: Clone + std::fmt::Debug;
/// Function used to convert from queued header to target header.
/// Function used to estimate size of target-encoded header.
fn estimate_size(source: &QueuedHeader<Self>) -> usize;
}