diff --git a/bridges/modules/ethereum-contract/builtin/Cargo.toml b/bridges/modules/ethereum-contract/builtin/Cargo.toml index 83fa872865..7a38e4497f 100644 --- a/bridges/modules/ethereum-contract/builtin/Cargo.toml +++ b/bridges/modules/ethereum-contract/builtin/Cargo.toml @@ -10,11 +10,36 @@ edition = "2018" # General dependencies codec = { package = "parity-scale-codec", version = "1.0.0" } -finality-grandpa = { version = "0.12.2", features = ["derive-codec"] } -sp-blockchain = "2.0.0-alpha.5" -sp-finality-grandpa = "2.0.0-alpha.5" -sp-runtime = "2.0.0-alpha.5" +ethereum-types = "0.9.1" # Runtime/chain specific dependencies bridge-node-runtime = { path = "../../../bin/node/runtime" } + +[dependencies.sp-blockchain] +version = "2.0.0-alpha.6" +rev = "c13ad41634d0bd7cf07897c2aa062b917d520520" +git = "https://github.com/paritytech/substrate/" + +[dependencies.sp-finality-grandpa] +version = "2.0.0-alpha.6" +rev = "c13ad41634d0bd7cf07897c2aa062b917d520520" +git = "https://github.com/paritytech/substrate/" + +[dependencies.sp-runtime] +version = "2.0.0-alpha.6" +rev = "c13ad41634d0bd7cf07897c2aa062b917d520520" +git = "https://github.com/paritytech/substrate/" + +[dependencies.sc-finality-grandpa] +version = "0.8.0-dev" +rev = "c13ad41634d0bd7cf07897c2aa062b917d520520" +git = "https://github.com/paritytech/substrate/" + +[dev-dependencies] +hex = "0.4" + +[dev-dependencies.sp-core] +version = "2.0.0-alpha.6" +rev = "c13ad41634d0bd7cf07897c2aa062b917d520520" +git = "https://github.com/paritytech/substrate/" diff --git a/bridges/modules/ethereum-contract/builtin/src/lib.rs b/bridges/modules/ethereum-contract/builtin/src/lib.rs index 3249307ac9..739e14d69e 100644 --- a/bridges/modules/ethereum-contract/builtin/src/lib.rs +++ b/bridges/modules/ethereum-contract/builtin/src/lib.rs @@ -14,17 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use bridge_node_runtime::{BlockNumber, Hash, Header as RuntimeHeader}; -use codec::Decode; +use bridge_node_runtime::{Block, BlockNumber, Hash, Header as RuntimeHeader}; +use codec::{Decode, Encode}; +use ethereum_types::U256; use sp_blockchain::Error as ClientError; +use sp_finality_grandpa::{AuthorityList, ConsensusLog, GRANDPA_ENGINE_ID}; /// Builtin errors. #[derive(Debug)] pub enum Error { + /// Failed to decode block number. + BlockNumberDecode, /// Failed to decode Substrate header. HeaderDecode(codec::Error), /// Failed to decode best voters set. - BestVotersDecode(codec::Error), + BestSetDecode(codec::Error), /// Failed to decode finality proof. FinalityProofDecode(codec::Error), /// Failed to verify justification. @@ -32,7 +36,7 @@ pub enum Error { } /// Substrate header. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct Header { /// Header hash. pub hash: Hash, @@ -45,7 +49,7 @@ pub struct Header { } /// GRANDPA validators set change signal. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct ValidatorsSetSignal { /// Signal delay. pub delay: BlockNumber, @@ -53,6 +57,20 @@ pub struct ValidatorsSetSignal { pub validators: Vec, } +/// Convert from U256 to BlockNumber. Fails if `U256` value isn't fitting within `BlockNumber` +/// limits (the runtime referenced by this module uses u32 as `BlockNumber`). +pub fn to_substrate_block_number(number: U256) -> Result { + match number == number.low_u32().into() { + true => Ok(number.low_u32()), + false => Err(Error::BlockNumberDecode), + } +} + +/// Convert from BlockNumber to U256. +pub fn from_substrate_block_number(number: BlockNumber) -> Result { + Ok(U256::from(number as u64)) +} + /// Parse Substrate header. pub fn parse_substrate_header(raw_header: &[u8]) -> Result { RuntimeHeader::decode(&mut &raw_header[..]) @@ -60,20 +78,243 @@ pub fn parse_substrate_header(raw_header: &[u8]) -> Result { hash: header.hash(), parent_hash: header.parent_hash, number: header.number, - signal: None, // TODO: parse me + signal: sp_runtime::traits::Header::digest(&header) + .log(|log| { + log.as_consensus().and_then(|(engine_id, log)| { + if engine_id == GRANDPA_ENGINE_ID { + Some(log) + } else { + None + } + }) + }) + .and_then(|log| ConsensusLog::decode(&mut &log[..]).ok()) + .and_then(|log| match log { + ConsensusLog::ScheduledChange(scheduled_change) => Some(ValidatorsSetSignal { + delay: scheduled_change.delay, + validators: scheduled_change.next_authorities.encode(), + }), + _ => None, + }), }) .map_err(Error::HeaderDecode) } /// Verify GRANDPA finality proof. pub fn verify_substrate_finality_proof( - _best_set_id: u64, - _raw_best_voters: &[u8], - _raw_best_header: &[u8], - _raw_headers: &[&[u8]], - _raw_finality_proof: &[u8], -) -> Result<(usize, usize), Error> { - Err(Error::JustificationVerify(ClientError::Msg( - "Not yet implemented".into(), - ))) // TODO: implement me + finality_target_number: BlockNumber, + finality_target_hash: Hash, + best_set_id: u64, + raw_best_set: &[u8], + raw_finality_proof: &[u8], +) -> Result<(), Error> { + let best_set = AuthorityList::decode(&mut &raw_best_set[..]).map_err(Error::BestSetDecode)?; + sc_finality_grandpa::GrandpaJustification::::decode_and_verify_finalizes( + &raw_finality_proof, + (finality_target_hash, finality_target_number), + best_set_id, + &best_set.into_iter().collect(), + ) + .map_err(Error::JustificationVerify) + .map(|_| ()) +} + +#[cfg(test)] +mod tests { + use super::*; + use bridge_node_runtime::DigestItem; + use sp_core::crypto::Public; + use sp_finality_grandpa::{AuthorityId, ScheduledChange}; + use sp_runtime::generic::Digest; + + #[test] + fn to_substrate_block_number_succeeds() { + assert_eq!(to_substrate_block_number(U256::zero()).unwrap(), 0); + assert_eq!( + to_substrate_block_number(U256::from(std::u32::MAX as u64)).unwrap(), + 0xFFFFFFFF + ); + } + + #[test] + fn to_substrate_block_number_fails() { + assert!(matches!( + to_substrate_block_number(U256::from(std::u32::MAX as u64 + 1)), + Err(Error::BlockNumberDecode) + )); + } + + #[test] + fn from_substrate_block_number_succeeds() { + assert_eq!(from_substrate_block_number(0).unwrap(), U256::zero()); + assert_eq!( + from_substrate_block_number(std::u32::MAX).unwrap(), + U256::from(std::u32::MAX) + ); + } + + #[test] + fn substrate_header_without_signal_parsed() { + let raw_header = RuntimeHeader { + parent_hash: [0u8; 32].into(), + number: 0, + state_root: "b2fc47904df5e355c6ab476d89fbc0733aeddbe302f0b94ba4eea9283f7e89e7" + .parse() + .unwrap(), + extrinsics_root: "03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314" + .parse() + .unwrap(), + digest: Default::default(), + } + .encode(); + assert_eq!( + raw_header, + hex::decode("000000000000000000000000000000000000000000000000000000000000000000b2fc47904df5e355c6ab476d89fbc0733aeddbe302f0b94ba4eea9283f7e89e703170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c11131400").unwrap(), + ); + + assert_eq!( + parse_substrate_header(&raw_header).unwrap(), + Header { + hash: "afbbeb92bf6ff14f60bdef0aa89f043dd403659ae82665238810ace0d761f6d0" + .parse() + .unwrap(), + parent_hash: Default::default(), + number: 0, + signal: None, + }, + ); + } + + #[test] + fn substrate_header_with_signal_parsed() { + let authorities = vec![ + (AuthorityId::from_slice(&[1; 32]), 101), + (AuthorityId::from_slice(&[3; 32]), 103), + ]; + let mut digest = Digest::default(); + digest.push(DigestItem::Consensus( + GRANDPA_ENGINE_ID, + ConsensusLog::ScheduledChange(ScheduledChange { + next_authorities: authorities.clone(), + delay: 8, + }) + .encode(), + )); + + let raw_header = RuntimeHeader { + parent_hash: "c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b" + .parse() + .unwrap(), + number: 8, + state_root: "822d6b412033aa9ac8e1722918eec5f25633529225754b3d4149982f5cacd4aa" + .parse() + .unwrap(), + extrinsics_root: "e7b07c0ce2799416ce7877b9cefc7f596bea5e8813bb2a0abf760414073ca928" + .parse() + .unwrap(), + digest, + } + .encode(); + assert_eq!( + raw_header, + hex::decode("c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b20822d6b412033aa9ac8e1722918eec5f25633529225754b3d4149982f5cacd4aae7b07c0ce2799416ce7877b9cefc7f596bea5e8813bb2a0abf760414073ca928040446524e4b59010108010101010101010101010101010101010101010101010101010101010101010165000000000000000303030303030303030303030303030303030303030303030303030303030303670000000000000008000000").unwrap(), + ); + + assert_eq!( + parse_substrate_header(&raw_header).unwrap(), + Header { + hash: "3dfebb280bd87a4640f89d7f2adecd62b88148747bff5b63af6e1634ee37a56e" + .parse() + .unwrap(), + parent_hash: "c0ac300d4005141ea690f3df593e049739c227316eb7f05052f3ee077388b68b" + .parse() + .unwrap(), + number: 8, + signal: Some(ValidatorsSetSignal { + delay: 8, + validators: authorities.encode(), + }), + }, + ); + } + + /// Number of the example block with justification. + const EXAMPLE_JUSTIFIED_BLOCK_NUMBER: u32 = 8; + /// Hash of the example block with justification. + const EXAMPLE_JUSTIFIED_BLOCK_HASH: &'static str = + "a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f343775"; + /// Id of authorities set that have generated example justification. Could be computed by tracking + /// every set change in canonized headers. + const EXAMPLE_AUTHORITIES_SET_ID: u64 = 0; + /// Encoded authorities set that has generated example justification. Could be fetched from `ScheduledChange` + /// digest of the block that has scheduled this set OR by calling `GrandpaApi::grandpa_authorities()` at + /// appropriate block. + const EXAMPLE_AUTHORITIES_SET: &'static str = "1488dc3417d5058ec4b4503e0c12ea1a0a89be200fe98922423d4334014fa6b0ee0100000000000000d17c2d7823ebf260fd138f2d7e27d114c0145d968b5ff5006125f2414fadae690100000000000000439660b36c6c03afafca027b910b4fecf99801834c62a5e6006f27d978de234f01000000000000005e639b43e0052c47447dac87d6fd2b6ec50bdd4d0f614e4299c665249bbd09d901000000000000001dfe3e22cc0d45c70779c1095f7489a8ef3cf52d62fbd8c2fa38c9f1723502b50100000000000000"; + /// Example justification. Could be fetched by calling 'chain_getBlock' RPC. + const EXAMPLE_JUSTIFICATION: &'static str = "2600000000000000a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f3437750800000010a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000d66b4ceb57ef8bcbc955071b597c8c5d2adcfdbb009c73f8438d342670fdeca9ac60686cbd58105b10f51d0a64a8e73b2e5829b2eab3248a008c472852130b00439660b36c6c03afafca027b910b4fecf99801834c62a5e6006f27d978de234fa2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000f5730c14d3cd22b7661e2f5fcb3139dd5fef37f946314a441d01b40ce1200ef70d810525f23fd278b588cd67473c200bda83c338c407b479386aa83798e5970b5e639b43e0052c47447dac87d6fd2b6ec50bdd4d0f614e4299c665249bbd09d9a2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f34377508000000c78d6ec463f476461a695b4791d30e7626d16fdf72d7c252c2cad387495a97e8c2827ed4d5af853d6e05d31cb6fb7438c9481a7e9c6990d60a9bfaf6a6e1930988dc3417d5058ec4b4503e0c12ea1a0a89be200fe98922423d4334014fa6b0eea2f45892db86b2ad133ce57d81b7e4375bb7035ce9883e6b68c358164f3437750800000052b4fc52d430286b3e2d650aa6e01b6ff4fae8b968893a62be789209eb97ee6e23780d3f5af7042d85bb48f1b202890b22724dfebce138826f66a5e00324320fd17c2d7823ebf260fd138f2d7e27d114c0145d968b5ff5006125f2414fadae6900"; + + #[test] + fn substrate_header_parse_fails() { + assert!(matches!(parse_substrate_header(&[]), Err(_))); + } + + #[test] + fn verify_substrate_finality_proof_succeeds() { + verify_substrate_finality_proof( + EXAMPLE_JUSTIFIED_BLOCK_NUMBER, + EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(), + EXAMPLE_AUTHORITIES_SET_ID, + &hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(), + &hex::decode(EXAMPLE_JUSTIFICATION).unwrap(), + ) + .unwrap(); + } + + #[test] + fn verify_substrate_finality_proof_fails_when_wrong_block_is_finalized() { + verify_substrate_finality_proof( + 4, + Default::default(), + EXAMPLE_AUTHORITIES_SET_ID, + &hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(), + &hex::decode(EXAMPLE_JUSTIFICATION).unwrap(), + ) + .unwrap_err(); + } + + #[test] + fn verify_substrate_finality_proof_fails_when_wrong_set_is_provided() { + verify_substrate_finality_proof( + EXAMPLE_JUSTIFIED_BLOCK_NUMBER, + EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(), + EXAMPLE_AUTHORITIES_SET_ID, + &hex::decode("deadbeef").unwrap(), + &hex::decode(EXAMPLE_JUSTIFICATION).unwrap(), + ) + .unwrap_err(); + } + + #[test] + fn verify_substrate_finality_proof_fails_when_wrong_set_id_is_provided() { + verify_substrate_finality_proof( + EXAMPLE_JUSTIFIED_BLOCK_NUMBER, + EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(), + 42, + &hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(), + &hex::decode(EXAMPLE_JUSTIFICATION).unwrap(), + ) + .unwrap_err(); + } + + #[test] + fn verify_substrate_finality_proof_fails_when_wrong_proof_is_provided() { + verify_substrate_finality_proof( + EXAMPLE_JUSTIFIED_BLOCK_NUMBER, + EXAMPLE_JUSTIFIED_BLOCK_HASH.parse().unwrap(), + 0, + &hex::decode(EXAMPLE_AUTHORITIES_SET).unwrap(), + &hex::decode("deadbeef").unwrap(), + ) + .unwrap_err(); + } } diff --git a/bridges/relays/ethereum/res/substrate-bridge-abi.json b/bridges/relays/ethereum/res/substrate-bridge-abi.json index 4ff57ab656..bd65f6b792 100644 --- a/bridges/relays/ethereum/res/substrate-bridge-abi.json +++ b/bridges/relays/ethereum/res/substrate-bridge-abi.json @@ -79,19 +79,18 @@ "type": "function" }, { - "inputs": [ - { - "internalType": "bytes32", - "name": "headerHash", - "type": "bytes32" - } - ], - "name": "isFinalityProofRequired", + "inputs": [], + "name": "incompleteHeaders", "outputs": [ { - "internalType": "bool", + "internalType": "uint256[]", "name": "", - "type": "bool" + "type": "uint256[]" + }, + { + "internalType": "bytes32[]", + "name": "", + "type": "bytes32[]" } ], "stateMutability": "view", diff --git a/bridges/relays/ethereum/res/substrate-bridge-bytecode.hex b/bridges/relays/ethereum/res/substrate-bridge-bytecode.hex index 54c252b65d..6b3b74df48 100644 --- a/bridges/relays/ethereum/res/substrate-bridge-bytecode.hex +++ b/bridges/relays/ethereum/res/substrate-bridge-bytecode.hex @@ -1 +1 @@ -60806040523480156200001157600080fd5b5060405162000e6738038062000e67833981810160405260608110156200003757600080fd5b81019080805160405193929190846401000000008211156200005857600080fd5b9083019060208201858111156200006e57600080fd5b82516401000000008111828201881017156200008957600080fd5b82525081516020918201929091019080838360005b83811015620000b85781810151838201526020016200009e565b50505050905090810190601f168015620000e65780820380516001836020036101000a031916815260200191505b506040818152602083015192018051929491939192846401000000008211156200010f57600080fd5b9083019060208201858111156200012557600080fd5b82516401000000008111828201881017156200014057600080fd5b82525081516020918201929091019080838360005b838110156200016f57818101518382015260200162000155565b50505050905090810190601f1680156200019d5780820380516001836020036101000a031916815260200191505b50604052505050620001ae620003d4565b620001c2846001600160e01b03620002dc16565b805160008181556002918255604080840180516001908155825160e08101845281815260208088015181830190815293518286019081526080808a0151606085019081526001600160401b038e169185019190915260a0840188905260c084018890528951885260058352959096208251815460ff191690151517815593519284019290925593519482019490945590518051949550919390926200026f92600385019291019062000409565b506080820151600482810180546001600160401b03199081166001600160401b039485161790915560a0850151600585015560c09094015160069093019290925560038054909316908616179091558251620002d19190602085019062000409565b5050505050620004ae565b620002e6620003d4565b600080600080600060608751602089016040516020810160208101602081016020810160a08588886010600019fa6200031e57600080fd5b93519251915190519351929b50909950975090955093505082159050620003a757816001600160401b03811180156200035657600080fd5b506040519080825280601f01601f19166020018201604052801562000382576020820181803683370190505b50905087516020890160208301848184846011600019fa620003a357600080fd5b5050505b6040805160a081018252968752602087019590955293850192909252606084015250608082015292915050565b6040518060a0016040528060008019168152602001600080191681526020016000815260200160008152602001606081525090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f106200044c57805160ff19168380011785556200047c565b828001600101855582156200047c579182015b828111156200047c5782518255916020019190600101906200045f565b506200048a9291506200048e565b5090565b620004ab91905b808211156200048a576000815560010162000495565b90565b6109a980620004be6000396000f3fe608060405234801561001057600080fd5b50600436106100575760003560e01c8063871ebe181461005c578063a98bfaad1461008d578063d96a2deb146100aa578063e7af0779146100cb578063fae71ae814610173575b600080fd5b6100796004803603602081101561007257600080fd5b5035610225565b604080519115158252519081900360200190f35b610079600480360360208110156100a357600080fd5b503561023a565b6100b2610297565b6040805192835260208301919091528051918290030190f35b610171600480360360208110156100e157600080fd5b8101906020810181356401000000008111156100fc57600080fd5b82018360208201111561010e57600080fd5b8035906020019184600183028401116401000000008311171561013057600080fd5b91908080601f0160208091040260200160405190810160405280939291908181526020018383808284376000920191909152509295506102af945050505050565b005b6101716004803603606081101561018957600080fd5b8135916020810135918101906060810160408201356401000000008111156101b057600080fd5b8201836020820111156101c257600080fd5b803590602001918460018302840111640100000000831117156101e457600080fd5b91908080601f01602080910402602001604051908101604052809392919081815260200183838082843760009201919091525092955061058e945050505050565b60009081526005602052604090205460ff1690565b6000818152600560205260408120805460ff16801561025e57506001548160020154115b8015610271575080600601548160020154145b80156102905750600354600482015467ffffffffffffffff9081169116145b9392505050565b60008054808252600560205260409091206002015491565b6102b76107a8565b6102c0826106ac565b805160009081526005602052604090205490915060ff1615610329576040805162461bcd60e51b815260206004820152601760248201527f48656164657220697320616c7265616479206b6e6f776e000000000000000000604482015290519081900360640190fd5b6020808201516000908152600590915260409020805460ff168015610358575060018260400151038160020154145b6103935760405162461bcd60e51b81526004018080602001828103825260268152602001806108ee6026913960400191505060405180910390fd5b6006810154158015906103ad575080600201548160060154145b156103f6578160200151600254146103f65760405162461bcd60e51b81526004018080602001828103825260318152602001806109436031913960400191505060405180910390fd5b60068101546080830151511561046b578260400151811061045e576040805162461bcd60e51b815260206004820152601960248201527f4f7665726c617070696e67207369676e616c7320666f756e6400000000000000604482015290519081900360640190fd5b5060608201516040830151015b60048201546005830154600384015467ffffffffffffffff9092169160026000196001831615610100020190911604156104ac575060208401516001909101905b6040805160e0810182526001808252602088810151818401908152898501518486019081526080808c01516060870190815267ffffffffffffffff8a169187019190915260a0860188905260c086018a90528b51600090815260058552969096208551815460ff191690151517815591519382019390935591516002830155925180519293919261054392600385019201906107dd565b50608082015160048201805467ffffffffffffffff191667ffffffffffffffff90921691909117905560a0820151600582015560c09091015160069091015550509151600055505050565b60008281526005602052604090206002015483146105dd5760405162461bcd60e51b815260040180806020018281038252602f815260200180610914602f913960400191505060405180910390fd5b60025460006105ed85858561079e565b600081815260056020526040902060028281558101546001559091505b8282146106a457506000908152600560205260409020600181015460068201546002830154919291141561069f576005818101546000908152602091909152604090206003805467ffffffffffffffff198116600167ffffffffffffffff9283168101909216178255908201805461069892600492916002610100928216159290920260001901160461085b565b50506106a4565b61060a565b505050505050565b6106b46107a8565b600080600080600060608751602089016040516020810160208101602081016020810160a08588886010600019fa6106eb57600080fd5b93519251915190519351929b50909950975090955093505082159050610771578167ffffffffffffffff8111801561072257600080fd5b506040519080825280601f01601f19166020018201604052801561074d576020820181803683370190505b50905087516020890160208301848184846011600019fa61076d57600080fd5b5050505b6040805160a081018252968752602087019590955293850192909252606084015250608082015292915050565b6002549392505050565b6040518060a0016040528060008019168152602001600080191681526020016000815260200160008152602001606081525090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f1061081e57805160ff191683800117855561084b565b8280016001018555821561084b579182015b8281111561084b578251825591602001919060010190610830565b506108579291506108d0565b5090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f10610894578054855561084b565b8280016001018555821561084b57600052602060002091601f016020900482015b8281111561084b5782548255916001019190600101906108b5565b6108ea91905b8082111561085757600081556001016108d6565b9056fe4d697373696e6720706172656e74206865616465722066726f6d207468652073746f726167654d697373696e672066696e616c69747920746172676574206865616465722066726f6d207468652073746f726167654d697373696e672072657175697265642066696e616c6974792070726f6f6620666f7220706172656e7420686561646572a26469706673582212206cc35a10288e85e37e14250b2d4af37dc4fbcd59635965d9ac0421cb8db1f66c64736f6c63430006060033 \ No newline at end of file +60806040523480156200001157600080fd5b506040516200131938038062001319833981810160405260608110156200003757600080fd5b81019080805160405193929190846401000000008211156200005857600080fd5b9083019060208201858111156200006e57600080fd5b82516401000000008111828201881017156200008957600080fd5b82525081516020918201929091019080838360005b83811015620000b85781810151838201526020016200009e565b50505050905090810190601f168015620000e65780820380516001836020036101000a031916815260200191505b506040818152602083015192018051929491939192846401000000008211156200010f57600080fd5b9083019060208201858111156200012557600080fd5b82516401000000008111828201881017156200014057600080fd5b82525081516020918201929091019080838360005b838110156200016f57818101518382015260200162000155565b50505050905090810190601f1680156200019d5780820380516001836020036101000a031916815260200191505b50604052505050620001ae620003d5565b620001c2846001600160e01b03620002dc16565b805160008181556002918255604080840180516001908155825160e08101845281815260208088015181830190815293518286019081526080808a0151606085019081526001600160401b038e169185019190915260a0840188905260c084018890528951885260078352959096208251815460ff191690151517815593519284019290925593519482019490945590518051949550919390926200026f9260038501929101906200040a565b506080820151600482810180546001600160401b03199081166001600160401b039485161790915560a0850151600585015560c09094015160069093019290925560038054909316908616179091558251620002d1919060208501906200040a565b5050505050620004af565b620002e6620003d5565b60008060008060008651602088016040516020810160208101602081016020810160a08588886010600019fa6200031c57600080fd5b84519b5083519a50825199508151985080519750505050505050506060816001600160401b03811180156200035057600080fd5b506040519080825280601f01601f1916602001820160405280156200037c576020820181803683370190505b5090508115620003a85787516020890160208301848184846011600019fa620003a457600080fd5b5050505b6040805160a081018252968752602087019590955293850192909252606084015250608082015292915050565b6040518060a0016040528060008019168152602001600080191681526020016000815260200160008152602001606081525090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f106200044d57805160ff19168380011785556200047d565b828001600101855582156200047d579182015b828111156200047d57825182559160200191906001019062000460565b506200048b9291506200048f565b5090565b620004ac91905b808211156200048b576000815560010162000496565b90565b610e5a80620004bf6000396000f3fe608060405234801561001057600080fd5b50600436106100575760003560e01c8063374c2c261461005c578063871ebe18146100fd578063d96a2deb1461012e578063e7af07791461014f578063fae71ae8146101f7575b600080fd5b6100646102a9565b604051808060200180602001838103835285818151815260200191508051906020019060200280838360005b838110156100a8578181015183820152602001610090565b50505050905001838103825284818151815260200191508051906020019060200280838360005b838110156100e75781810151838201526020016100cf565b5050505090500194505050505060405180910390f35b61011a6004803603602081101561011357600080fd5b50356103ae565b604080519115158252519081900360200190f35b6101366103c3565b6040805192835260208301919091528051918290030190f35b6101f56004803603602081101561016557600080fd5b81019060208101813564010000000081111561018057600080fd5b82018360208201111561019257600080fd5b803590602001918460018302840111640100000000831117156101b457600080fd5b91908080601f0160208091040260200160405190810160405280939291908181526020018383808284376000920191909152509295506103db945050505050565b005b6101f56004803603606081101561020d57600080fd5b81359160208101359181019060608101604082013564010000000081111561023457600080fd5b82018360208201111561024657600080fd5b8035906020019184600183028401116401000000008311171561026857600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600092019190915250929550610779945050505050565b6005546060908190818167ffffffffffffffff811180156102c957600080fd5b506040519080825280602002602001820160405280156102f3578160200160208202803683370190505b50905060005b8281101561034e57600760006005838154811061031257fe5b906000526020600020015481526020019081526020016000206002015482828151811061033b57fe5b60209081029190910101526001016102f9565b508060058080548060200260200160405190810160405280929190818152602001828054801561039d57602002820191906000526020600020905b815481526020019060010190808311610389575b505050505090509350935050509091565b60009081526007602052604090205460ff1690565b60008054808252600760205260409091206002015491565b6103e3610c13565b6103ec826109de565b805160009081526007602052604090205490915060ff1615610455576040805162461bcd60e51b815260206004820152601760248201527f48656164657220697320616c7265616479206b6e6f776e000000000000000000604482015290519081900360640190fd5b6001548160400151116104995760405162461bcd60e51b8152600401808060200182810382526025815260200180610d596025913960400191505060405180910390fd5b6020808201516000908152600790915260409020805460ff1680156104c8575060018260400151038160020154145b6105035760405162461bcd60e51b8152600401808060200182810382526026815260200180610d7e6026913960400191505060405180910390fd5b60068101541580159061051d575080600201548160060154145b15610566578160200151600254146105665760405162461bcd60e51b8152600401808060200182810382526031815260200180610df46031913960400191505060405180910390fd5b60048101546005820154600683015460808501515167ffffffffffffffff90931692156106445767ffffffffffffffff83811614156105d65760405162461bcd60e51b8152600401808060200182810382526021815260200180610da46021913960400191505060405180910390fd5b8460400151811061062e576040805162461bcd60e51b815260206004820152601960248201527f4f7665726c617070696e67207369676e616c7320666f756e6400000000000000604482015290519081900360640190fd5b5050825160608401516040850151600190930192015b8460400151811415610697576005805486516001820180845560009384527f036b6384b5eca791c62761152d0c79bb0604c104a5fb6f4eb0703f3154bb3db0909201558651825260066020526040909120555b6040805160e0810182526001808252602088810151818401908152898501518486019081526080808c01516060870190815267ffffffffffffffff8b169187019190915260a0860189905260c086018890528b51600090815260078552969096208551815460ff191690151517815591519382019390935591516002830155925180519293919261072e9260038501920190610c48565b50608082015160048201805467ffffffffffffffff191667ffffffffffffffff90921691909117905560a0820151600582015560c09091015160069091015550509151600055505050565b60008281526007602052604090206002015483146107c85760405162461bcd60e51b815260040180806020018281038252602f815260200180610dc5602f913960400191505060405180910390fd5b60028054600354600480546040805160206101006001851615026000190190931696909604601f81018390048302870183019091528086529394600094610879948a948a9467ffffffffffffffff9092169392909183018282801561086e5780601f106108435761010080835404028352916020019161086e565b820191906000526020600020905b81548152906001019060200180831161085157829003601f168201915b505050505087610ad1565b600081815260076020526040902060028281558101546001559091505b8282146109d657506000818152600760209081526040808320600181015460069093529220549092908015610958576005546000199182019181018214610924576000600560018303815481106108e957fe5b90600052602060002001549050806005848154811061090457fe5b600091825260208083209091019290925591825260069052604090208290555b600580548061092f57fe5b600082815260208082208301600019908101839055909201909255848252600690526040812055505b8260060154836002015414156109cf57600583015460009081526007602052604090206003805467ffffffffffffffff198116600167ffffffffffffffff928316810190921617825590820180546109c6926004929160026101009282161592909202600019011604610cc6565b505050506109d6565b5050610896565b505050505050565b6109e6610c13565b60008060008060008651602088016040516020810160208101602081016020810160a08588886010600019fa610a1b57600080fd5b84519b5083519a508251995081519850805197505050505050505060608167ffffffffffffffff81118015610a4f57600080fd5b506040519080825280601f01601f191660200182016040528015610a7a576020820181803683370190505b5090508115610aa45787516020890160208301848184846011600019fa610aa057600080fd5b5050505b6040805160a081018252968752602087019590955293850192909252606084015250608082015292915050565b600060608686868686604051602001808681526020018581526020018467ffffffffffffffff1667ffffffffffffffff1681526020018060200180602001838103835285818151815260200191508051906020019080838360005b83811015610b44578181015183820152602001610b2c565b50505050905090810190601f168015610b715780820380516001836020036101000a031916815260200191505b50838103825284518152845160209182019186019080838360005b83811015610ba4578181015183820152602001610b8c565b50505050905090810190601f168015610bd15780820380516001836020036101000a031916815260200191505b50975050505050505050604051602081830303815290604052905080516020820160008083836012600019fa610c0657600080fd5b5095979650505050505050565b6040518060a0016040528060008019168152602001600080191681526020016000815260200160008152602001606081525090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f10610c8957805160ff1916838001178555610cb6565b82800160010185558215610cb6579182015b82811115610cb6578251825591602001919060010190610c9b565b50610cc2929150610d3b565b5090565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f10610cff5780548555610cb6565b82800160010185558215610cb657600052602060002091601f016020900482015b82811115610cb6578254825591600101919060010190610d20565b610d5591905b80821115610cc25760008155600101610d41565b9056fe547279696e6720746f20696d706f7274206e6f6e2d63616e6f6e6963616c206865616465724d697373696e6720706172656e74206865616465722066726f6d207468652073746f7261676552656163686564206d6178696d616c2076616c696461746f7273207365742069644d697373696e672066696e616c69747920746172676574206865616465722066726f6d207468652073746f726167654d697373696e672072657175697265642066696e616c6974792070726f6f6620666f7220706172656e7420686561646572a26469706673582212206a13d7fad990f5ff64cfe638f258f1cf2a4c72a3161a28ec200db444a3d61c2e64736f6c63430006060033 \ No newline at end of file diff --git a/bridges/relays/ethereum/res/substrate-bridge-metadata.txt b/bridges/relays/ethereum/res/substrate-bridge-metadata.txt index 0dc5d853f2..dcf50c5cfa 100644 --- a/bridges/relays/ethereum/res/substrate-bridge-metadata.txt +++ b/bridges/relays/ethereum/res/substrate-bridge-metadata.txt @@ -1,5 +1,5 @@ -Last Change Date: 2020-04-28 +Last Change Date: 2020-05-01 Solc version: 0.6.6+commit.6c089d02 -Source hash (keccak256): 0xdc46aff04e37129265223e507d17f1407a70cb1ecea3230e1eaa77a17586724d -Source gist: https://gist.github.com/svyatonik/876b388f9507a8de242cb2db9547c4f0 +Source hash (keccak256): 0x36403636ad41082ca6c937c60ab06446cd9ef7036c178fa2f04d7c8286544d39 +Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/8b54f5f648f8685fecd52b7af1deb277922b0fc3/substrate-bridge.sol Compiler flags used (command to produce the file): `docker run -i ethereum/solc:0.6.6 --optimize --bin - < substrate-bridge.sol` \ No newline at end of file diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index 8cc6078123..ac70c31044 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -15,7 +15,7 @@ // along with Parity Bridges Common. If not, see . use crate::ethereum_types::{Address, Bytes, EthereumHeaderId, Header, Receipt, TransactionHash, H256, U256, U64}; -use crate::substrate_types::{Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId}; +use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId}; use crate::sync_types::{HeaderId, MaybeConnectionError}; use crate::{bail_on_arg_error, bail_on_error}; use codec::{Decode, Encode}; @@ -26,6 +26,7 @@ use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; use parity_crypto::publickey::KeyPair; use serde::{de::DeserializeOwned, Serialize}; use serde_json::{from_value, to_value}; +use std::collections::HashSet; // to encode/decode contract calls ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json"); @@ -300,6 +301,64 @@ pub async fn submit_substrate_headers( (client, Ok(ids)) } +/// Returns ids of incomplete Substrate headers. +pub async fn incomplete_substrate_headers( + client: Client, + contract_address: Address, +) -> (Client, Result, Error>) { + let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call(); + let call_request = bail_on_arg_error!( + to_value(CallRequest { + to: Some(contract_address), + data: Some(encoded_call.into()), + }) + .map_err(|e| Error::RequestSerialization(e)), + client + ); + let (client, call_result) = + bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); + match call_decoder.decode(&call_result.0) { + Ok((incomplete_headers_numbers, incomplete_headers_hashes)) => ( + client, + Ok(incomplete_headers_numbers + .into_iter() + .zip(incomplete_headers_hashes) + .filter_map(|(number, hash)| { + if number != number.low_u32().into() { + return None; + } + + Some(HeaderId(number.low_u32(), hash)) + }) + .collect()), + ), + Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))), + } +} + +/// Complete Substrate header. +pub async fn complete_substrate_header( + client: Client, + params: EthereumSigningParams, + contract_address: Address, + id: SubstrateHeaderId, + justification: GrandpaJustification, +) -> (Client, Result) { + let (client, _) = bail_on_error!( + submit_ethereum_transaction( + client, + ¶ms, + Some(contract_address), + None, + false, + bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification,), + ) + .await + ); + + (client, Ok(id)) +} + /// Deploy bridge contract. pub async fn deploy_bridge_contract( client: Client, diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index 7c1743a59e..c9e8a987f0 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -14,19 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +//! Ethereum PoA -> Substrate synchronization. + use crate::ethereum_client::{self, EthereumConnectionParams}; use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt}; use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams}; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{SourceClient, TargetClient}; -use futures::future::FutureExt; -use std::{future::Future, pin::Pin}; +use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; +use futures::future::{ready, FutureExt, Ready}; +use std::{collections::HashSet, future::Future, pin::Pin, time::Duration}; use web3::types::H256; -/// Interval (in ms) at which we check new Ethereum headers when we are synced/almost synced. -const ETHEREUM_TICK_INTERVAL_MS: u64 = 10_000; -/// Interval (in ms) at which we check new Substrate blocks. -const SUBSTRATE_TICK_INTERVAL_MS: u64 = 5_000; +/// Interval at which we check new Ethereum headers when we are synced/almost synced. +const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(10); +/// Interval at which we check new Substrate blocks. +const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(5); /// Max number of headers in single submit transaction. const MAX_HEADERS_IN_SINGLE_SUBMIT: usize = 32; /// Max total size of headers in single submit transaction. This only affects signed @@ -76,13 +78,15 @@ struct EthereumHeadersSource { client: ethereum_client::Client, } +type EthereumFutureOutput = OwnedSourceFutureOutput; + impl SourceClient for EthereumHeadersSource { type Error = ethereum_client::Error; - type BestBlockNumberFuture = Pin)>>>; - type HeaderByHashFuture = Pin)>>>; - type HeaderByNumberFuture = Pin)>>>; - type HeaderExtraFuture = - Pin), Self::Error>)>>>; + type BestBlockNumberFuture = Pin>>>; + type HeaderByHashFuture = Pin>>>; + type HeaderByNumberFuture = Pin>>>; + type HeaderExtraFuture = Pin)>>>>; + type HeaderCompletionFuture = Ready)>>; fn best_block_number(self) -> Self::BestBlockNumberFuture { ethereum_client::best_block_number(self.client) @@ -107,6 +111,10 @@ impl SourceClient for EthereumHeadersSource { .map(|(client, result)| (EthereumHeadersSource { client }, result)) .boxed() } + + fn header_completion(self, id: EthereumHeaderId) -> Self::HeaderCompletionFuture { + ready((self, Ok((id, None)))) + } } /// Substrate client as Ethereum headers target. @@ -119,12 +127,16 @@ struct SubstrateHeadersTarget { sign_params: SubstrateSigningParams, } +type SubstrateFutureOutput = OwnedTargetFutureOutput; + impl TargetClient for SubstrateHeadersTarget { type Error = substrate_client::Error; - type BestHeaderIdFuture = Pin)>>>; - type IsKnownHeaderFuture = Pin)>>>; - type RequiresExtraFuture = Pin)>>>; - type SubmitHeadersFuture = Pin, Self::Error>)>>>; + type BestHeaderIdFuture = Pin>>>; + type IsKnownHeaderFuture = Pin>>>; + type RequiresExtraFuture = Pin>>>; + type SubmitHeadersFuture = Pin>>>>; + type IncompleteHeadersFuture = Ready>>; + type CompleteHeadersFuture = Ready>; fn best_header_id(self) -> Self::BestHeaderIdFuture { let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); @@ -192,6 +204,14 @@ impl TargetClient for SubstrateHeadersTarget { }) .boxed() } + + fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture { + ready((self, Ok(HashSet::new()))) + } + + fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> Self::CompleteHeadersFuture { + ready((self, Ok(id))) + } } /// Run Ethereum headers synchronization. @@ -206,13 +226,13 @@ pub fn run(params: EthereumSyncParams) { crate::sync_loop::run( EthereumHeadersSource { client: eth_client }, - ETHEREUM_TICK_INTERVAL_MS, + ETHEREUM_TICK_INTERVAL, SubstrateHeadersTarget { client: sub_client, sign_transactions: sign_sub_transactions, sign_params: params.sub_sign, }, - SUBSTRATE_TICK_INTERVAL_MS, + SUBSTRATE_TICK_INTERVAL, params.sync_params, ); } diff --git a/bridges/relays/ethereum/src/ethereum_types.rs b/bridges/relays/ethereum/src/ethereum_types.rs index d996ac7c29..4731570d4d 100644 --- a/bridges/relays/ethereum/src/ethereum_types.rs +++ b/bridges/relays/ethereum/src/ethereum_types.rs @@ -56,6 +56,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline { type Number = u64; type Header = Header; type Extra = Vec; + type Completion = (); fn estimate_size(source: &QueuedHeader) -> usize { into_substrate_ethereum_header(source.header()).encode().len() diff --git a/bridges/relays/ethereum/src/headers.rs b/bridges/relays/ethereum/src/headers.rs index 0e34b9a98f..a75937da8c 100644 --- a/bridges/relays/ethereum/src/headers.rs +++ b/bridges/relays/ethereum/src/headers.rs @@ -15,9 +15,11 @@ // along with Parity Bridges Common. If not, see . use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use linked_hash_map::LinkedHashMap; use num_traits::{One, Zero}; -use std::collections::{ - btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet, +use std::{ + collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet}, + time::{Duration, Instant}, }; type HeadersQueue

= @@ -25,6 +27,9 @@ type HeadersQueue

= type KnownHeaders

= BTreeMap<

::Number, HashMap<

::Hash, HeaderStatus>>; +/// We're trying to fetch completion data for single header at this interval. +const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20); + /// Ethereum headers queue. #[derive(Debug)] pub struct QueuedHeaders { @@ -43,16 +48,34 @@ pub struct QueuedHeaders { extra: HeadersQueue

, /// Headers that are ready to be submitted to target node. ready: HeadersQueue

, + /// Headers that are ready to be submitted to target node, but their ancestor is incomplete. + /// Thus we're waiting for these ancestors to be completed first. + /// Note that the incomplete header itself is synced and it isn't in this queue. + incomplete: HeadersQueue

, /// Headers that are (we believe) currently submitted to target node by our, /// not-yet mined transactions. submitted: HeadersQueue

, /// Pointers to all headers that we ever seen and we believe we can touch in the future. known_headers: KnownHeaders

, + /// Headers that are waiting for completion data from source node. Mapped (and auto-sorted + /// by) to the last fetch time. + incomplete_headers: LinkedHashMap, Option>, + /// Headers that are waiting to be completed at target node. Auto-sorted by insertion time. + completion_data: LinkedHashMap, P::Completion>, /// Pruned blocks border. We do not store or accept any blocks with number less than /// this number. prune_border: P::Number, } +/// Header completion data. +#[derive(Debug)] +struct HeaderCompletion { + /// Last time when we tried to upload completion data to target node, if ever. + pub last_upload_time: Option, + /// Completion data. + pub completion: Completion, +} + impl QueuedHeaders

{ /// Returns new QueuedHeaders. pub fn new() -> Self { @@ -62,8 +85,11 @@ impl QueuedHeaders

{ maybe_extra: HeadersQueue::new(), extra: HeadersQueue::new(), ready: HeadersQueue::new(), + incomplete: HeadersQueue::new(), submitted: HeadersQueue::new(), known_headers: KnownHeaders::

::new(), + incomplete_headers: LinkedHashMap::new(), + completion_data: LinkedHashMap::new(), prune_border: Zero::zero(), } } @@ -89,6 +115,7 @@ impl QueuedHeaders

{ .fold(0, |total, headers| total + headers.len()), HeaderStatus::Extra => self.extra.values().fold(0, |total, headers| total + headers.len()), HeaderStatus::Ready => self.ready.values().fold(0, |total, headers| total + headers.len()), + HeaderStatus::Incomplete => self.incomplete.values().fold(0, |total, headers| total + headers.len()), HeaderStatus::Submitted => self.submitted.values().fold(0, |total, headers| total + headers.len()), } } @@ -105,6 +132,7 @@ impl QueuedHeaders

{ .fold(0, |total, headers| total + headers.len()) + self.extra.values().fold(0, |total, headers| total + headers.len()) + self.ready.values().fold(0, |total, headers| total + headers.len()) + + self.incomplete.values().fold(0, |total, headers| total + headers.len()) } /// Returns number of best block in the queue. @@ -119,7 +147,10 @@ impl QueuedHeaders

{ self.extra.keys().next_back().cloned().unwrap_or_else(Zero::zero), std::cmp::max( self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero), - self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero), + std::cmp::max( + self.incomplete.keys().next_back().cloned().unwrap_or_else(Zero::zero), + self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero), + ), ), ), ), @@ -145,6 +176,7 @@ impl QueuedHeaders

{ HeaderStatus::MaybeExtra => oldest_header(&self.maybe_extra), HeaderStatus::Extra => oldest_header(&self.extra), HeaderStatus::Ready => oldest_header(&self.ready), + HeaderStatus::Incomplete => oldest_header(&self.incomplete), HeaderStatus::Submitted => oldest_header(&self.submitted), } } @@ -162,6 +194,7 @@ impl QueuedHeaders

{ HeaderStatus::MaybeExtra => oldest_headers(&self.maybe_extra, f), HeaderStatus::Extra => oldest_headers(&self.extra, f), HeaderStatus::Ready => oldest_headers(&self.ready, f), + HeaderStatus::Incomplete => oldest_headers(&self.incomplete, f), HeaderStatus::Submitted => oldest_headers(&self.submitted, f), } } @@ -207,6 +240,7 @@ impl QueuedHeaders

{ HeaderStatus::MaybeExtra | HeaderStatus::Extra | HeaderStatus::Ready + | HeaderStatus::Incomplete | HeaderStatus::Submitted | HeaderStatus::Synced => { insert_header(&mut self.maybe_extra, id, header); @@ -226,67 +260,7 @@ impl QueuedHeaders

{ /// Receive best header from the target node. pub fn target_best_header_response(&mut self, id: &HeaderId) { - // all ancestors of this header are now synced => let's remove them from - // queues - let mut current = *id; - let mut id_processed = false; - loop { - let header = match self.status(¤t) { - HeaderStatus::Unknown => break, - HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, ¤t), - HeaderStatus::Orphan => remove_header(&mut self.orphan, ¤t), - HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, ¤t), - HeaderStatus::Extra => remove_header(&mut self.extra, ¤t), - HeaderStatus::Ready => remove_header(&mut self.ready, ¤t), - HeaderStatus::Submitted => remove_header(&mut self.submitted, ¤t), - HeaderStatus::Synced => break, - } - .expect("header has a given status; given queue has the header; qed"); - - log::debug!( - target: "bridge", - "{} header {:?} is now {:?}", - P::SOURCE_NAME, - current, - HeaderStatus::Synced, - ); - *self - .known_headers - .entry(current.0) - .or_default() - .entry(current.1) - .or_insert(HeaderStatus::Synced) = HeaderStatus::Synced; - current = header.parent_id(); - id_processed = true; - } - - // remember that the header is synced - if !id_processed { - // to avoid duplicate log message - log::debug!( - target: "bridge", - "{} header {:?} is now {:?}", - P::SOURCE_NAME, - id, - HeaderStatus::Synced, - ); - *self - .known_headers - .entry(id.0) - .or_default() - .entry(id.1) - .or_insert(HeaderStatus::Synced) = HeaderStatus::Synced; - } - - // now let's move all descendants from maybe_orphan && orphan queues to - // maybe_extra queue - move_header_descendants::

( - &mut [&mut self.maybe_orphan, &mut self.orphan], - &mut self.maybe_extra, - &mut self.known_headers, - HeaderStatus::MaybeExtra, - id, - ); + self.header_synced(id) } /// Receive target node response for MaybeOrphan request. @@ -315,6 +289,8 @@ impl QueuedHeaders

{ pub fn maybe_extra_response(&mut self, id: &HeaderId, response: bool) { let (destination_status, destination_queue) = if response { (HeaderStatus::Extra, &mut self.extra) + } else if self.is_parent_incomplete(id) { + (HeaderStatus::Incomplete, &mut self.incomplete) } else { (HeaderStatus::Ready, &mut self.ready) }; @@ -331,17 +307,42 @@ impl QueuedHeaders

{ /// Receive extra from source node. pub fn extra_response(&mut self, id: &HeaderId, extra: P::Extra) { + let (destination_status, destination_queue) = if self.is_parent_incomplete(id) { + (HeaderStatus::Incomplete, &mut self.incomplete) + } else { + (HeaderStatus::Ready, &mut self.ready) + }; + // move header itself from extra to ready queue move_header( &mut self.extra, - &mut self.ready, + destination_queue, &mut self.known_headers, - HeaderStatus::Ready, + destination_status, id, |header| header.set_extra(extra), ); } + /// Receive completion response from source node. + pub fn completion_response(&mut self, id: &HeaderId, completion: Option) { + let completion = match completion { + Some(completion) => completion, + None => return, // we'll try refetch later + }; + + if self.incomplete_headers.remove(id).is_some() { + log::debug!( + target: "bridge", + "Received completion data from {} for header: {:?}", + P::SOURCE_NAME, + id, + ); + + self.completion_data.insert(id.clone(), completion); + } + } + /// When header is submitted to target node. pub fn headers_submitted(&mut self, ids: Vec>) { for id in ids { @@ -356,7 +357,110 @@ impl QueuedHeaders

{ } } - /// Prune and never accep headers before this block. + /// When header completion data is sent to target node. + pub fn header_completed(&mut self, id: &HeaderId) { + if self.completion_data.remove(id).is_some() { + log::debug!( + target: "bridge", + "Sent completion data to {} for header: {:?}", + P::TARGET_NAME, + id, + ); + + // transaction can be dropped by target chain nodes => it would never be mined + // + // in current implementation the sync loop would wait for some time && if best + // **source** header won't change on **target** node, then the sync will be restarted + // => we'll resubmit the same completion data again (the same is true for submitted + // headers) + // + // the other option would be to track emitted transactions at least on target node, + // but it won't give us 100% guarantee anyway + // + // => we're just dropping completion data just after it has been submitted + } + } + + /// When incomplete headers ids are receved from target node. + pub fn incomplete_headers_response(&mut self, ids: HashSet>) { + // all new incomplete headers are marked Synced and all their descendants + // are moved from Ready/Submitted to Incomplete queue + let new_incomplete_headers = ids + .iter() + .filter(|id| !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id)) + .cloned() + .collect::>(); + for new_incomplete_header in new_incomplete_headers { + self.header_synced(&new_incomplete_header); + move_header_descendants::

( + &mut [&mut self.ready, &mut self.submitted], + &mut self.incomplete, + &mut self.known_headers, + HeaderStatus::Incomplete, + &new_incomplete_header, + ); + + log::debug!( + target: "bridge", + "Scheduling completion data retrieval for header: {:?}", + new_incomplete_header, + ); + + self.incomplete_headers.insert(new_incomplete_header, None); + } + + // for all headers that were incompleted previously, but now are completed, we move + // all descendants from incomplete to ready + let just_completed_headers = self + .incomplete_headers + .keys() + .chain(self.completion_data.keys()) + .filter(|id| !ids.contains(id)) + .cloned() + .collect::>(); + for just_completed_header in just_completed_headers { + move_header_descendants::

( + &mut [&mut self.incomplete], + &mut self.ready, + &mut self.known_headers, + HeaderStatus::Ready, + &just_completed_header, + ); + + log::debug!( + target: "bridge", + "Completion data is no longer required for header: {:?}", + just_completed_header, + ); + + self.incomplete_headers.remove(&just_completed_header); + self.completion_data.remove(&just_completed_header); + } + } + + /// Returns id of the header for which we want to fetch completion data. + pub fn incomplete_header(&mut self) -> Option> { + queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| { + let retry = match *last_fetch_time { + Some(last_fetch_time) => last_fetch_time.elapsed() > RETRY_FETCH_COMPLETION_INTERVAL, + None => true, + }; + + if retry { + *last_fetch_time = Some(Instant::now()); + } + + retry + }) + .map(|(id, _)| id) + } + + /// Returns header completion data to upload to target node. + pub fn header_to_complete(&mut self) -> Option<(HeaderId, &P::Completion)> { + queued_incomplete_header(&mut self.completion_data, |_| true) + } + + /// Prune and never accept headers before this block. pub fn prune(&mut self, prune_border: P::Number) { if prune_border <= self.prune_border { return; @@ -368,6 +472,7 @@ impl QueuedHeaders

{ prune_queue(&mut self.extra, prune_border); prune_queue(&mut self.ready, prune_border); prune_queue(&mut self.submitted, prune_border); + prune_queue(&mut self.incomplete, prune_border); prune_known_headers::

(&mut self.known_headers, prune_border); self.prune_border = prune_border; } @@ -379,10 +484,81 @@ impl QueuedHeaders

{ self.maybe_extra.clear(); self.extra.clear(); self.ready.clear(); + self.incomplete.clear(); self.submitted.clear(); self.known_headers.clear(); self.prune_border = Zero::zero(); } + + /// Returns true if parent of this header is either incomplete or waiting for + /// its own incomplete ancestor to be completed. + fn is_parent_incomplete(&self, id: &HeaderId) -> bool { + let status = self.status(id); + let header = match status { + HeaderStatus::MaybeOrphan => header(&self.maybe_orphan, id), + HeaderStatus::Orphan => header(&self.orphan, id), + HeaderStatus::MaybeExtra => header(&self.maybe_extra, id), + HeaderStatus::Extra => header(&self.extra, id), + HeaderStatus::Ready => header(&self.ready, id), + HeaderStatus::Incomplete => header(&self.incomplete, id), + HeaderStatus::Submitted => header(&self.submitted, id), + HeaderStatus::Unknown => return false, + HeaderStatus::Synced => return false, + }; + + match header { + Some(header) => { + let parent_id = header.header().parent_id(); + self.incomplete_headers.contains_key(&parent_id) + || self.completion_data.contains_key(&parent_id) + || self.status(&parent_id) == HeaderStatus::Incomplete + } + None => false, + } + } + + /// When we receive new Synced header from target node. + fn header_synced(&mut self, id: &HeaderId) { + // all ancestors of this header are now synced => let's remove them from + // queues + let mut current = *id; + let mut id_processed = false; + loop { + let header = match self.status(¤t) { + HeaderStatus::Unknown => break, + HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, ¤t), + HeaderStatus::Orphan => remove_header(&mut self.orphan, ¤t), + HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, ¤t), + HeaderStatus::Extra => remove_header(&mut self.extra, ¤t), + HeaderStatus::Ready => remove_header(&mut self.ready, ¤t), + HeaderStatus::Incomplete => remove_header(&mut self.incomplete, ¤t), + HeaderStatus::Submitted => remove_header(&mut self.submitted, ¤t), + HeaderStatus::Synced => break, + } + .expect("header has a given status; given queue has the header; qed"); + + set_header_status::

(&mut self.known_headers, ¤t, HeaderStatus::Synced); + + current = header.parent_id(); + id_processed = true; + } + + // remember that the header itself is synced + // (condition is here to avoid duplicate log messages) + if !id_processed { + set_header_status::

(&mut self.known_headers, &id, HeaderStatus::Synced); + } + + // now let's move all descendants from maybe_orphan && orphan queues to + // maybe_extra queue + move_header_descendants::

( + &mut [&mut self.maybe_orphan, &mut self.orphan], + &mut self.maybe_extra, + &mut self.known_headers, + HeaderStatus::MaybeExtra, + id, + ); + } } /// Insert header to the queue. @@ -411,6 +587,14 @@ fn remove_header( header } +/// Get header from the queue. +fn header<'a, P: HeadersSyncPipeline>( + queue: &'a HeadersQueue

, + id: &HeaderId, +) -> Option<&'a QueuedHeader

> { + queue.get(&id.0).and_then(|by_hash| by_hash.get(&id.1)) +} + /// Move header from source to destination queue. /// /// Returns ID of parent header, if header has been moved, or None otherwise. @@ -428,16 +612,8 @@ fn move_header( }; let parent_id = header.header().parent_id(); - known_headers.entry(id.0).or_default().insert(id.1, destination_status); destination_queue.entry(id.0).or_default().insert(id.1, header); - - log::debug!( - target: "bridge", - "{} header {:?} is now {:?}", - P::SOURCE_NAME, - id, - destination_status, - ); + set_header_status::

(known_headers, id, destination_status); Some(parent_id) } @@ -473,19 +649,8 @@ fn move_header_descendants( if current_parents.contains(&entry.get().header().parent_id().1) { let header_to_move = entry.remove(); let header_to_move_id = header_to_move.id(); - known_headers - .entry(header_to_move_id.0) - .or_default() - .insert(header_to_move_id.1, destination_status); headers_to_move.push((header_to_move_id, header_to_move)); - - log::debug!( - target: "bridge", - "{} header {:?} is now {:?}", - P::SOURCE_NAME, - header_to_move_id, - destination_status, - ); + set_header_status::

(known_headers, &header_to_move_id, destination_status); } } @@ -544,6 +709,44 @@ fn prune_known_headers(known_headers: &mut KnownHeaders< *known_headers = new_known_headers; } +/// Change header status. +fn set_header_status( + known_headers: &mut KnownHeaders

, + id: &HeaderId, + status: HeaderStatus, +) { + log::debug!( + target: "bridge", + "{} header {:?} is now {:?}", + P::SOURCE_NAME, + id, + status, + ); + *known_headers.entry(id.0).or_default().entry(id.1).or_insert(status) = status; +} + +/// Returns queued incomplete header with maximal elapsed time since last update. +fn queued_incomplete_header( + map: &mut LinkedHashMap, + filter: impl FnMut(&mut T) -> bool, +) -> Option<(Id, &T)> { + // TODO (#84): headers that have been just appended to the end of the queue would have to wait until + // all previous headers will be retried + + let retry_old_header = map + .front() + .map(|(key, _)| key.clone()) + .and_then(|key| map.get_mut(&key).map(filter)) + .unwrap_or(false); + if retry_old_header { + let (header_key, header) = map.pop_front().expect("we have checked that front() exists; qed"); + map.insert(header_key, header); + return map.back().map(|(id, data)| (id.clone(), data)); + } + + None +} + #[cfg(test)] pub(crate) mod tests { use super::*; @@ -595,7 +798,11 @@ pub(crate) mod tests { hash(6), QueuedHeader::::new(Default::default()), ); - assert_eq!(queue.total_headers(), 6); + queue.incomplete.entry(6).or_default().insert( + hash(7), + QueuedHeader::::new(Default::default()), + ); + assert_eq!(queue.total_headers(), 7); } #[test] @@ -639,6 +846,12 @@ pub(crate) mod tests { QueuedHeader::::new(Default::default()), ); assert_eq!(queue.best_queued_number(), 40); + // and then there's some header in Incomplete + queue.incomplete.entry(50).or_default().insert( + hash(50), + QueuedHeader::::new(Default::default()), + ); + assert_eq!(queue.best_queued_number(), 50); } #[test] @@ -946,6 +1159,7 @@ pub(crate) mod tests { #[test] fn negative_maybe_extra_response_works() { + // when parent header is complete let mut queue = QueuedHeaders::::new(); queue .known_headers @@ -957,10 +1171,24 @@ pub(crate) mod tests { assert!(queue.maybe_extra.is_empty()); assert_eq!(queue.ready.len(), 1); assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); + + // when parent header is incomplete + queue.incomplete_headers.insert(id(200), None); + queue + .known_headers + .entry(201) + .or_default() + .insert(hash(201), HeaderStatus::MaybeExtra); + queue.maybe_extra.entry(201).or_default().insert(hash(201), header(201)); + queue.maybe_extra_response(&id(201), false); + assert!(queue.maybe_extra.is_empty()); + assert_eq!(queue.incomplete.len(), 1); + assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete); } #[test] fn receipts_response_works() { + // when parent header is complete let mut queue = QueuedHeaders::::new(); queue .known_headers @@ -972,6 +1200,19 @@ pub(crate) mod tests { assert!(queue.extra.is_empty()); assert_eq!(queue.ready.len(), 1); assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); + + // when parent header is incomplete + queue.incomplete_headers.insert(id(200), None); + queue + .known_headers + .entry(201) + .or_default() + .insert(hash(201), HeaderStatus::Extra); + queue.extra.entry(201).or_default().insert(hash(201), header(201)); + queue.extra_response(&id(201), Vec::new()); + assert!(queue.extra.is_empty()); + assert_eq!(queue.incomplete.len(), 1); + assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete); } #[test] @@ -988,9 +1229,172 @@ pub(crate) mod tests { assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Submitted); } + #[test] + fn incomplete_header_works() { + let mut queue = QueuedHeaders::::new(); + + // nothing to complete if queue is empty + assert_eq!(queue.incomplete_header(), None); + + // when there's new header to complete => ask for completion data + queue.incomplete_headers.insert(id(100), None); + assert_eq!(queue.incomplete_header(), Some(id(100))); + + // we have just asked for completion data => nothing to request + assert_eq!(queue.incomplete_header(), None); + + // enough time have passed => ask again + queue.incomplete_headers.clear(); + queue.incomplete_headers.insert( + id(100), + Some(Instant::now() - RETRY_FETCH_COMPLETION_INTERVAL - RETRY_FETCH_COMPLETION_INTERVAL), + ); + assert_eq!(queue.incomplete_header(), Some(id(100))); + } + + #[test] + fn completion_response_works() { + let mut queue = QueuedHeaders::::new(); + queue.incomplete_headers.insert(id(100), None); + queue.incomplete_headers.insert(id(200), Some(Instant::now())); + + // when headers isn't incompete, nothing changes + queue.completion_response(&id(300), None); + assert_eq!(queue.incomplete_headers.len(), 2); + assert_eq!(queue.completion_data.len(), 0); + assert_eq!(queue.header_to_complete(), None); + + // when response is None, nothing changes + queue.completion_response(&id(100), None); + assert_eq!(queue.incomplete_headers.len(), 2); + assert_eq!(queue.completion_data.len(), 0); + assert_eq!(queue.header_to_complete(), None); + + // when response is Some, we're scheduling completion + queue.completion_response(&id(200), Some(())); + assert_eq!(queue.incomplete_headers.len(), 1); + assert_eq!(queue.completion_data.len(), 1); + assert!(queue.incomplete_headers.contains_key(&id(100))); + assert!(queue.completion_data.contains_key(&id(200))); + assert_eq!(queue.header_to_complete(), Some((id(200), &()))); + } + + #[test] + fn header_completed_works() { + let mut queue = QueuedHeaders::::new(); + queue.completion_data.insert(id(100), ()); + + // when unknown header is completed + queue.header_completed(&id(200)); + assert_eq!(queue.completion_data.len(), 1); + + // when known header is completed + queue.header_completed(&id(100)); + assert_eq!(queue.completion_data.len(), 0); + } + + #[test] + fn incomplete_headers_response_works() { + let mut queue = QueuedHeaders::::new(); + + // when we have already submitted #101 and #102 is ready + queue + .known_headers + .entry(101) + .or_default() + .insert(hash(101), HeaderStatus::Submitted); + queue.submitted.entry(101).or_default().insert(hash(101), header(101)); + queue + .known_headers + .entry(102) + .or_default() + .insert(hash(102), HeaderStatus::Ready); + queue.submitted.entry(102).or_default().insert(hash(102), header(102)); + + // AND now we know that the #100 is incomplete + queue.incomplete_headers_response(vec![id(100)].into_iter().collect()); + + // => #101 and #102 are moved to the Incomplete and #100 is now synced + assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(101)), HeaderStatus::Incomplete); + assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete); + assert_eq!(queue.submitted.len(), 0); + assert_eq!(queue.ready.len(), 0); + assert!(queue.incomplete.entry(101).or_default().contains_key(&hash(101))); + assert!(queue.incomplete.entry(102).or_default().contains_key(&hash(102))); + assert!(queue.incomplete_headers.contains_key(&id(100))); + assert!(queue.completion_data.is_empty()); + + // and then header #100 is no longer incomplete + queue.incomplete_headers_response(vec![].into_iter().collect()); + + // => #101 and #102 are moved to the Ready queue and #100 if now forgotten + assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(101)), HeaderStatus::Ready); + assert_eq!(queue.status(&id(102)), HeaderStatus::Ready); + assert_eq!(queue.incomplete.len(), 0); + assert_eq!(queue.submitted.len(), 0); + assert!(queue.ready.entry(101).or_default().contains_key(&hash(101))); + assert!(queue.ready.entry(102).or_default().contains_key(&hash(102))); + assert!(queue.incomplete_headers.is_empty()); + assert!(queue.completion_data.is_empty()); + } + + #[test] + fn is_parent_incomplete_works() { + let mut queue = QueuedHeaders::::new(); + + // when we do not know header itself + assert_eq!(queue.is_parent_incomplete(&id(50)), false); + + // when we do not know parent + queue + .known_headers + .entry(100) + .or_default() + .insert(hash(100), HeaderStatus::Incomplete); + queue.incomplete.entry(100).or_default().insert(hash(100), header(100)); + assert_eq!(queue.is_parent_incomplete(&id(100)), false); + + // when parent is inside incomplete queue (i.e. some other ancestor is actually incomplete) + queue + .known_headers + .entry(101) + .or_default() + .insert(hash(101), HeaderStatus::Submitted); + queue.submitted.entry(101).or_default().insert(hash(101), header(101)); + assert_eq!(queue.is_parent_incomplete(&id(101)), true); + + // when parent is the incomplete header and we do not have completion data + queue.incomplete_headers.insert(id(199), None); + queue + .known_headers + .entry(200) + .or_default() + .insert(hash(200), HeaderStatus::Submitted); + queue.submitted.entry(200).or_default().insert(hash(200), header(200)); + assert_eq!(queue.is_parent_incomplete(&id(200)), true); + + // when parent is the incomplete header and we have completion data + queue.completion_data.insert(id(299), ()); + queue + .known_headers + .entry(300) + .or_default() + .insert(hash(300), HeaderStatus::Submitted); + queue.submitted.entry(300).or_default().insert(hash(300), header(300)); + assert_eq!(queue.is_parent_incomplete(&id(300)), true); + } + #[test] fn prune_works() { let mut queue = QueuedHeaders::::new(); + queue + .known_headers + .entry(105) + .or_default() + .insert(hash(105), HeaderStatus::Incomplete); + queue.incomplete.entry(105).or_default().insert(hash(105), header(105)); queue .known_headers .entry(104) @@ -1033,7 +1437,8 @@ pub(crate) mod tests { assert_eq!(queue.maybe_extra.len(), 1); assert_eq!(queue.orphan.len(), 1); assert_eq!(queue.maybe_orphan.len(), 1); - assert_eq!(queue.known_headers.len(), 3); + assert_eq!(queue.incomplete.len(), 1); + assert_eq!(queue.known_headers.len(), 4); queue.prune(110); @@ -1042,6 +1447,7 @@ pub(crate) mod tests { assert_eq!(queue.maybe_extra.len(), 0); assert_eq!(queue.orphan.len(), 0); assert_eq!(queue.maybe_orphan.len(), 0); + assert_eq!(queue.incomplete.len(), 0); assert_eq!(queue.known_headers.len(), 0); queue.header_response(header(109).header().clone()); @@ -1050,4 +1456,51 @@ pub(crate) mod tests { queue.header_response(header(110).header().clone()); assert_eq!(queue.known_headers.len(), 1); } + + #[test] + fn incomplete_headers_are_still_incomplete_after_advance() { + let mut queue = QueuedHeaders::::new(); + + // relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete queue + queue.incomplete_headers.insert(id(100), None); + queue.incomplete.entry(101).or_default().insert(hash(101), header(101)); + queue.incomplete.entry(102).or_default().insert(hash(102), header(102)); + queue.incomplete.entry(103).or_default().insert(hash(103), header(103)); + queue.incomplete.entry(104).or_default().insert(hash(104), header(104)); + queue + .known_headers + .entry(100) + .or_default() + .insert(hash(100), HeaderStatus::Synced); + queue + .known_headers + .entry(101) + .or_default() + .insert(hash(101), HeaderStatus::Incomplete); + queue + .known_headers + .entry(102) + .or_default() + .insert(hash(102), HeaderStatus::Incomplete); + queue + .known_headers + .entry(103) + .or_default() + .insert(hash(103), HeaderStatus::Incomplete); + queue + .known_headers + .entry(104) + .or_default() + .insert(hash(104), HeaderStatus::Incomplete); + + // let's say relay#2 completes header#100 and then submits header#101+header#102 and it turns + // out that header#102 is also incomplete + queue.incomplete_headers_response(vec![id(102)].into_iter().collect()); + + // then the header#103 and the header#104 must have Incomplete status + assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(102)), HeaderStatus::Synced); + assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete); + assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete); + } } diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index 3d74dec250..e62c12e31b 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -16,7 +16,8 @@ use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256}; use crate::substrate_types::{ - into_substrate_ethereum_header, into_substrate_ethereum_receipts, Hash, Header as SubstrateHeader, Number, + into_substrate_ethereum_header, into_substrate_ethereum_receipts, GrandpaJustification, Hash, + Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock, SubstrateHeaderId, }; use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader}; use crate::{bail_on_arg_error, bail_on_error}; @@ -274,6 +275,19 @@ pub async fn submit_unsigned_ethereum_headers( (client, Ok(ids)) } +/// Get GRANDPA justification for given block. +pub async fn grandpa_justification( + client: Client, + id: SubstrateHeaderId, +) -> (Client, Result<(SubstrateHeaderId, Option), Error>) { + let hash = bail_on_arg_error!(to_value(id.1).map_err(|e| Error::RequestSerialization(e)), client); + let (client, signed_block) = call_rpc(client, "chain_getBlock", Params::Array(vec![hash]), rpc_returns_value).await; + ( + client, + signed_block.map(|signed_block: SignedSubstrateBlock| (id, signed_block.justification)), + ) +} + /// Get GRANDPA authorities set at given block. pub async fn grandpa_authorities_set(client: Client, block: Hash) -> (Client, Result, Error>) { let block = bail_on_arg_error!(to_value(block).map_err(|e| Error::RequestSerialization(e)), client); diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 2c194dac01..88dc4b6276 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -14,22 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +//! Substrate -> Ethereum synchronization. + use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams}; use crate::ethereum_types::Address; use crate::substrate_client::{self, SubstrateConnectionParams}; use crate::substrate_types::{ - Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline, + GrandpaJustification, Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline, }; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{SourceClient, TargetClient}; +use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; use crate::sync_types::SourceHeader; use futures::future::{ready, FutureExt, Ready}; -use std::{future::Future, pin::Pin}; +use std::{collections::HashSet, future::Future, pin::Pin, time::Duration}; -/// Interval (in ms) at which we check new Substrate headers when we are synced/almost synced. -const SUBSTRATE_TICK_INTERVAL_MS: u64 = 10_000; -/// Interval (in ms) at which we check new Ethereum blocks. -const ETHEREUM_TICK_INTERVAL_MS: u64 = 5_000; +/// Interval at which we check new Substrate headers when we are synced/almost synced. +const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(10); +/// Interval at which we check new Ethereum blocks. +const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(5); /// Max Ethereum headers we want to have in all 'before-submitted' states. const MAX_FUTURE_HEADERS_TO_DOWNLOAD: usize = 8; /// Max Ethereum headers count we want to have in 'submitted' state. @@ -83,12 +85,16 @@ struct SubstrateHeadersSource { client: substrate_client::Client, } +type SubstrateFutureOutput = OwnedSourceFutureOutput; + impl SourceClient for SubstrateHeadersSource { type Error = substrate_client::Error; - type BestBlockNumberFuture = Pin)>>>; - type HeaderByHashFuture = Pin)>>>; - type HeaderByNumberFuture = Pin)>>>; - type HeaderExtraFuture = Ready<(Self, Result<(SubstrateHeaderId, ()), Self::Error>)>; + type BestBlockNumberFuture = Pin>>>; + type HeaderByHashFuture = Pin>>>; + type HeaderByNumberFuture = Pin>>>; + type HeaderExtraFuture = Ready>; + type HeaderCompletionFuture = + Pin)>>>>; fn best_block_number(self) -> Self::BestBlockNumberFuture { substrate_client::best_header(self.client) @@ -111,6 +117,12 @@ impl SourceClient for SubstrateHeadersSource { fn header_extra(self, id: SubstrateHeaderId, _header: &Header) -> Self::HeaderExtraFuture { ready((self, Ok((id, ())))) } + + fn header_completion(self, id: SubstrateHeaderId) -> Self::HeaderCompletionFuture { + substrate_client::grandpa_justification(self.client, id) + .map(|(client, result)| (SubstrateHeadersSource { client }, result)) + .boxed() + } } /// Ethereum client as Substrate headers target. @@ -123,12 +135,16 @@ struct EthereumHeadersTarget { sign_params: EthereumSigningParams, } +type EthereumFutureOutput = OwnedTargetFutureOutput; + impl TargetClient for EthereumHeadersTarget { type Error = ethereum_client::Error; - type BestHeaderIdFuture = Pin)>>>; - type IsKnownHeaderFuture = Pin)>>>; - type RequiresExtraFuture = Ready<(Self, Result<(SubstrateHeaderId, bool), Self::Error>)>; - type SubmitHeadersFuture = Pin, Self::Error>)>>>; + type BestHeaderIdFuture = Pin>>>; + type IsKnownHeaderFuture = Pin>>>; + type RequiresExtraFuture = Ready>; + type SubmitHeadersFuture = Pin>>>>; + type IncompleteHeadersFuture = Pin>>>>; + type CompleteHeadersFuture = Pin>>>; fn best_header_id(self) -> Self::BestHeaderIdFuture { let (contract, sign_params) = (self.contract, self.sign_params); @@ -181,6 +197,38 @@ impl TargetClient for EthereumHeadersTarget { }) .boxed() } + + fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture { + let (contract, sign_params) = (self.contract, self.sign_params); + ethereum_client::incomplete_substrate_headers(self.client, contract) + .map(move |(client, result)| { + ( + EthereumHeadersTarget { + client, + contract, + sign_params, + }, + result, + ) + }) + .boxed() + } + + fn complete_header(self, id: SubstrateHeaderId, completion: GrandpaJustification) -> Self::CompleteHeadersFuture { + let (contract, sign_params) = (self.contract, self.sign_params); + ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion) + .map(move |(client, result)| { + ( + EthereumHeadersTarget { + client, + contract, + sign_params, + }, + result, + ) + }) + .boxed() + } } /// Run Substrate headers synchronization. @@ -190,13 +238,13 @@ pub fn run(params: SubstrateSyncParams) { crate::sync_loop::run( SubstrateHeadersSource { client: sub_client }, - SUBSTRATE_TICK_INTERVAL_MS, + SUBSTRATE_TICK_INTERVAL, EthereumHeadersTarget { client: eth_client, contract: params.eth_contract_address, sign_params: params.eth_sign, }, - ETHEREUM_TICK_INTERVAL_MS, + ETHEREUM_TICK_INTERVAL, params.sync_params, ); } diff --git a/bridges/relays/ethereum/src/substrate_types.rs b/bridges/relays/ethereum/src/substrate_types.rs index d657f0269d..89e8572cca 100644 --- a/bridges/relays/ethereum/src/substrate_types.rs +++ b/bridges/relays/ethereum/src/substrate_types.rs @@ -34,6 +34,12 @@ pub type Number = bridge_node_runtime::BlockNumber; /// Substrate header type. pub type Header = bridge_node_runtime::Header; +/// Substrate signed block type. +pub type SignedBlock = bridge_node_runtime::SignedBlock; + +/// GRANDPA justification. +pub type GrandpaJustification = Vec; + /// Substrate header ID. pub type SubstrateHeaderId = HeaderId; @@ -53,6 +59,7 @@ impl HeadersSyncPipeline for SubstrateHeadersSyncPipeline { type Number = bridge_node_runtime::BlockNumber; type Header = Header; type Extra = (); + type Completion = GrandpaJustification; fn estimate_size(source: &QueuedHeader) -> usize { source.header().encode().len() diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index b2e97c9794..ce89a846a0 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -18,10 +18,14 @@ use crate::sync::HeadersSyncParams; use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader}; use futures::{future::FutureExt, stream::StreamExt}; use num_traits::{Saturating, Zero}; -use std::future::Future; +use std::{ + collections::HashSet, + future::Future, + time::{Duration, Instant}, +}; /// When we submit headers to target node, but see no updates of best -/// source block known to target node during STALL_SYNC_TIMEOUT_MS milliseconds, +/// source block known to target node during STALL_SYNC_TIMEOUT seconds, /// we consider that our headers are rejected because there has been reorg in target chain. /// This reorg could invalidate our knowledge about sync process (i.e. we have asked if /// HeaderA is known to target, but then reorg happened and the answer is different @@ -30,26 +34,35 @@ use std::future::Future; /// direct child of previous best header. But: (1) subscription doesn't guarantee that /// the subscriber will receive every best header (2) reorg won't always lead to sync /// stall and restart is a heavy operation (we forget all in-memory headers). -const STALL_SYNC_TIMEOUT_MS: u64 = 30_000; -/// Delay (in milliseconds) after we have seen update of best source header at target node, +const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60); +/// Delay after we have seen update of best source header at target node, /// for us to treat sync stalled. ONLY when relay operates in backup mode. -const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000; -/// Delay (in milliseconds) after connection-related error happened before we'll try +const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); +/// Delay after connection-related error happened before we'll try /// reconnection again. -const CONNECTION_ERROR_DELAY_MS: u64 = 10_000; +const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); + +/// Type alias for all SourceClient futures. +pub type OwnedSourceFutureOutput = (Client, Result>::Error>); +/// Type alias for all TargetClient futures. +pub type OwnedTargetFutureOutput = (Client, Result>::Error>); /// Source client trait. pub trait SourceClient: Sized { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; /// Future that returns best block number. - type BestBlockNumberFuture: Future)>; + type BestBlockNumberFuture: Future>; /// Future that returns header by hash. - type HeaderByHashFuture: Future)>; + type HeaderByHashFuture: Future>; /// Future that returns header by number. - type HeaderByNumberFuture: Future)>; + type HeaderByNumberFuture: Future>; /// Future that returns extra data associated with header. - type HeaderExtraFuture: Future, P::Extra), Self::Error>)>; + type HeaderExtraFuture: Future, P::Extra)>>; + /// Future that returns data required to 'complete' header. + type HeaderCompletionFuture: Future< + Output = OwnedSourceFutureOutput, Option)>, + >; /// Get best block number. fn best_block_number(self) -> Self::BestBlockNumberFuture; @@ -59,6 +72,8 @@ pub trait SourceClient: Sized { fn header_by_number(self, number: P::Number) -> Self::HeaderByNumberFuture; /// Get extra data by header hash. fn header_extra(self, id: HeaderId, header: &P::Header) -> Self::HeaderExtraFuture; + /// Get completion data by header hash. + fn header_completion(self, id: HeaderId) -> Self::HeaderCompletionFuture; } /// Target client trait. @@ -66,13 +81,19 @@ pub trait TargetClient: Sized { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; /// Future that returns best header id. - type BestHeaderIdFuture: Future, Self::Error>)>; + type BestHeaderIdFuture: Future>>; /// Future that returns known header check result. - type IsKnownHeaderFuture: Future, bool), Self::Error>)>; + type IsKnownHeaderFuture: Future, bool)>>; /// Future that returns extra check result. - type RequiresExtraFuture: Future, bool), Self::Error>)>; + type RequiresExtraFuture: Future, bool)>>; /// Future that returns header submission result. - type SubmitHeadersFuture: Future>, Self::Error>)>; + type SubmitHeadersFuture: Future>>>; + /// Future that returns incomplete headers ids. + type IncompleteHeadersFuture: Future< + Output = OwnedTargetFutureOutput>>, + >; + /// Future that returns header completion result. + type CompleteHeadersFuture: Future>>; /// Returns ID of best header known to the target node. fn best_header_id(self) -> Self::BestHeaderIdFuture; @@ -82,23 +103,31 @@ pub trait TargetClient: Sized { fn requires_extra(self, header: &QueuedHeader

) -> Self::RequiresExtraFuture; /// Submit headers. fn submit_headers(self, headers: Vec>) -> Self::SubmitHeadersFuture; + /// Returns ID of headers that require to be 'completed' before children can be submitted. + fn incomplete_headers_ids(self) -> Self::IncompleteHeadersFuture; + /// Submit completion data for header. + fn complete_header( + self, + id: HeaderId, + completion: P::Completion, + ) -> Self::CompleteHeadersFuture; } /// Run headers synchronization. pub fn run( source_client: impl SourceClient

, - source_tick_ms: u64, + source_tick: Duration, target_client: impl TargetClient

, - target_tick_ms: u64, + target_tick: Duration, sync_params: HeadersSyncParams, ) { let mut local_pool = futures::executor::LocalPool::new(); - let mut progress_context = (std::time::Instant::now(), None, None); + let mut progress_context = (Instant::now(), None, None); local_pool.run_until(async move { let mut sync = crate::sync::HeadersSync::

::new(sync_params); let mut stall_countdown = None; - let mut last_update_time = std::time::Instant::now(); + let mut last_update_time = Instant::now(); let mut source_maybe_client = None; let mut source_best_block_number_required = false; @@ -106,29 +135,36 @@ pub fn run( let source_new_header_future = futures::future::Fuse::terminated(); let source_orphan_header_future = futures::future::Fuse::terminated(); let source_extra_future = futures::future::Fuse::terminated(); + let source_completion_future = futures::future::Fuse::terminated(); let source_go_offline_future = futures::future::Fuse::terminated(); - let source_tick_stream = interval(source_tick_ms).fuse(); + let source_tick_stream = interval(source_tick).fuse(); let mut target_maybe_client = None; let mut target_best_block_required = false; + let mut target_incomplete_headers_required = true; let target_best_block_future = target_client.best_header_id().fuse(); + let target_incomplete_headers_future = futures::future::Fuse::terminated(); let target_extra_check_future = futures::future::Fuse::terminated(); let target_existence_status_future = futures::future::Fuse::terminated(); let target_submit_header_future = futures::future::Fuse::terminated(); + let target_complete_header_future = futures::future::Fuse::terminated(); let target_go_offline_future = futures::future::Fuse::terminated(); - let target_tick_stream = interval(target_tick_ms).fuse(); + let target_tick_stream = interval(target_tick).fuse(); futures::pin_mut!( source_best_block_number_future, source_new_header_future, source_orphan_header_future, source_extra_future, + source_completion_future, source_go_offline_future, source_tick_stream, target_best_block_future, + target_incomplete_headers_future, target_extra_check_future, target_existence_status_future, target_submit_header_future, + target_complete_header_future, target_go_offline_future, target_tick_stream ); @@ -144,7 +180,7 @@ pub fn run( source_best_block_number, |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + |source_client| delay(CONNECTION_ERROR_DELAY, source_client), || format!("Error retrieving best header number from {}", P::SOURCE_NAME), ); }, @@ -155,7 +191,7 @@ pub fn run( source_new_header, |source_new_header| sync.headers_mut().header_response(source_new_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + |source_client| delay(CONNECTION_ERROR_DELAY, source_client), || format!("Error retrieving header from {} node", P::SOURCE_NAME), ); }, @@ -166,7 +202,7 @@ pub fn run( source_orphan_header, |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + |source_client| delay(CONNECTION_ERROR_DELAY, source_client), || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), ); }, @@ -177,10 +213,21 @@ pub fn run( source_extra, |(header, extra)| sync.headers_mut().extra_response(&header, extra), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + |source_client| delay(CONNECTION_ERROR_DELAY, source_client), || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), ); }, + (source_client, source_completion) = source_completion_future => { + process_future_result( + &mut source_maybe_client, + source_client, + source_completion, + |(header, completion)| sync.headers_mut().completion_response(&header, completion), + &mut source_go_offline_future, + |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), + ); + }, source_client = source_go_offline_future => { source_maybe_client = Some(source_client); }, @@ -199,21 +246,20 @@ pub fn run( |target_best_block| { let head_updated = sync.target_best_header_response(target_best_block); if head_updated { - last_update_time = std::time::Instant::now(); + last_update_time = Instant::now(); } match head_updated { // IF head is updated AND there are still our transactions: // => restart stall countdown timer true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 => - stall_countdown = Some(std::time::Instant::now()), + stall_countdown = Some(Instant::now()), // IF head is updated AND there are no our transactions: // => stop stall countdown timer true => stall_countdown = None, // IF head is not updated AND stall countdown is not yet completed // => do nothing false if stall_countdown - .map(|stall_countdown| std::time::Instant::now() - stall_countdown < - std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS)) + .map(|stall_countdown| stall_countdown.elapsed() < STALL_SYNC_TIMEOUT) .unwrap_or(true) => (), // IF head is not updated AND stall countdown has completed @@ -231,10 +277,23 @@ pub fn run( } }, &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), || format!("Error retrieving best known header from {} node", P::TARGET_NAME), ); }, + (target_client, incomplete_headers_ids) = target_incomplete_headers_future => { + target_incomplete_headers_required = false; + + process_future_result( + &mut target_maybe_client, + target_client, + incomplete_headers_ids, + |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), + ); + }, (target_client, target_existence_status) = target_existence_status_future => { process_future_result( &mut target_maybe_client, @@ -244,7 +303,7 @@ pub fn run( .headers_mut() .maybe_orphan_response(&target_header, target_existence_status), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), || format!("Error retrieving existence status from {} node", P::TARGET_NAME), ); }, @@ -255,10 +314,21 @@ pub fn run( target_submit_header_result, |submitted_headers| sync.headers_mut().headers_submitted(submitted_headers), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), || format!("Error submitting headers to {} node", P::TARGET_NAME), ); }, + (target_client, target_complete_header_result) = target_complete_header_future => { + process_future_result( + &mut target_maybe_client, + target_client, + target_complete_header_result, + |completed_header| sync.headers_mut().header_completed(&completed_header), + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || format!("Error completing headers at {}", P::TARGET_NAME), + ); + }, (target_client, target_extra_check_result) = target_extra_check_future => { process_future_result( &mut target_maybe_client, @@ -268,7 +338,7 @@ pub fn run( .headers_mut() .maybe_extra_response(&header, extra_check_result), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + |target_client| delay(CONNECTION_ERROR_DELAY, target_client), || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), ); }, @@ -277,6 +347,7 @@ pub fn run( }, _ = target_tick_stream.next() => { target_best_block_required = true; + target_incomplete_headers_required = true; }, } @@ -287,13 +358,26 @@ pub fn run( if let Some(target_client) = target_maybe_client.take() { // the priority is to: // 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely; - // 2) check if we need extra data from source - it stops us from downloading/submitting new blocks; - // 3) check existence - it stops us from submitting new blocks; - // 4) submit header + // 2) get incomplete headers - it stops us from submitting new blocks + we call it rarely; + // 3) complete headers - it stops us from submitting new blocks; + // 4) check if we need extra data from source - it stops us from downloading/submitting new blocks; + // 5) check existence - it stops us from submitting new blocks; + // 6) submit header if target_best_block_required { log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); target_best_block_future.set(target_client.best_header_id().fuse()); + } else if target_incomplete_headers_required { + log::debug!(target: "bridge", "Asking {} about incomplete headers", P::TARGET_NAME); + target_incomplete_headers_future.set(target_client.incomplete_headers_ids().fuse()); + } else if let Some((id, completion)) = sync.headers_mut().header_to_complete() { + log::debug!( + target: "bridge", + "Going to complete header: {:?}", + id, + ); + + target_complete_header_future.set(target_client.complete_header(id, completion.clone()).fuse()); } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) { log::debug!( target: "bridge", @@ -314,9 +398,9 @@ pub fn run( ); target_existence_status_future.set(target_client.is_known_header(parent_id).fuse()); - } else if let Some(headers) = sync.select_headers_to_submit( - last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS), - ) { + } else if let Some(headers) = + sync.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT) + { let ids = match headers.len() { 1 => format!("{:?}", headers[0].id()), 2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()), @@ -335,7 +419,7 @@ pub fn run( // remember that we have submitted some headers if stall_countdown.is_none() { - stall_countdown = Some(std::time::Instant::now()); + stall_countdown = Some(Instant::now()); } } else { target_maybe_client = Some(target_client); @@ -346,13 +430,21 @@ pub fn run( if let Some(source_client) = source_maybe_client.take() { // the priority is to: // 1) get best block - it stops us from downloading new blocks + we call it rarely; - // 2) download extra data - it stops us from submitting new blocks; - // 3) download missing headers - it stops us from downloading/submitting new blocks; - // 4) downloading new headers + // 2) download completion data - it stops us from submitting new blocks; + // 3) download extra data - it stops us from submitting new blocks; + // 4) download missing headers - it stops us from downloading/submitting new blocks; + // 5) downloading new headers if source_best_block_number_required { log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); source_best_block_number_future.set(source_client.best_block_number().fuse()); + } else if let Some(id) = sync.headers_mut().incomplete_header() { + log::debug!( + target: "bridge", + "Retrieving completion data for header: {:?}", + id, + ); + source_completion_future.set(source_client.header_completion(id).fuse()); } else if let Some(header) = sync.headers().header(HeaderStatus::Extra) { let id = header.id(); log::debug!( @@ -402,15 +494,15 @@ pub fn run( } /// Future that resolves into given value after given timeout. -async fn delay(timeout_ms: u64, retval: T) -> T { - async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await; +async fn delay(timeout: Duration, retval: T) -> T { + async_std::task::sleep(timeout).await; retval } /// Stream that emits item every `timeout_ms` milliseconds. -fn interval(timeout_ms: u64) -> impl futures::Stream { +fn interval(timeout: Duration) -> impl futures::Stream { futures::stream::unfold((), move |_| async move { - delay(timeout_ms, ()).await; + delay(timeout, ()).await; Some(((), ())) }) } @@ -447,14 +539,14 @@ fn process_future_result( /// Print synchronization progress. fn print_sync_progress( - progress_context: (std::time::Instant, Option, Option), + progress_context: (Instant, Option, Option), eth_sync: &crate::sync::HeadersSync

, -) -> (std::time::Instant, Option, Option) { +) -> (Instant, Option, Option) { let (prev_time, prev_best_header, prev_target_header) = progress_context; - let now_time = std::time::Instant::now(); + let now_time = Instant::now(); let (now_best_header, now_target_header) = eth_sync.status(); - let need_update = now_time - prev_time > std::time::Duration::from_secs(10) + let need_update = now_time - prev_time > Duration::from_secs(10) || match (prev_best_header, now_best_header) { (Some(prev_best_header), Some(now_best_header)) => { now_best_header.0.saturating_sub(prev_best_header) > 10.into() diff --git a/bridges/relays/ethereum/src/sync_types.rs b/bridges/relays/ethereum/src/sync_types.rs index 9b9712c739..8d68bd8759 100644 --- a/bridges/relays/ethereum/src/sync_types.rs +++ b/bridges/relays/ethereum/src/sync_types.rs @@ -15,7 +15,7 @@ // along with Parity Bridges Common. If not, see . /// Ethereum header Id. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] pub struct HeaderId(pub Number, pub Hash); /// Ethereum header synchronization status. @@ -33,6 +33,8 @@ pub enum HeaderStatus { Extra, /// Header is in Ready queue. Ready, + /// Header is in Incomplete queue. + Incomplete, /// Header has been recently submitted to the target node. Submitted, /// Header is known to the target node. @@ -61,6 +63,7 @@ pub trait HeadersSyncPipeline: Clone + Copy { + Copy + std::fmt::Debug + std::fmt::Display + + std::hash::Hash + std::ops::Add + std::ops::Sub + num_traits::Saturating @@ -68,10 +71,24 @@ pub trait HeadersSyncPipeline: Clone + Copy { + num_traits::One; /// Type of header that we're syncing. type Header: Clone + std::fmt::Debug + SourceHeader; - /// Type of extra data for the header that we're receiving from the source node. + /// Type of extra data for the header that we're receiving from the source node: + /// 1) extra data is required for some headers; + /// 2) target node may answer if it'll require extra data before header is submitted; + /// 3) extra data available since the header creation time; + /// 4) header and extra data are submitted in single transaction. + /// + /// Example: Ethereum transactions receipts. type Extra: Clone + std::fmt::Debug; + /// Type of data required to 'complete' header that we're receiving from the source node: + /// 1) completion data is required for some headers; + /// 2) target node can't answer if it'll require completion data before header is accepted; + /// 3) completion data may be generated after header generation; + /// 4) header and completion data are submitted in separate transactions. + /// + /// Example: Substrate GRANDPA justifications. + type Completion: Clone + std::fmt::Debug; - /// Function used to convert from queued header to target header. + /// Function used to estimate size of target-encoded header. fn estimate_size(source: &QueuedHeader) -> usize; }