Sync ethereum headers using unsigned (substrate) transactions (#45)

* reward submitters on finalization

* PoA -> Substrate: unsigned_import_header API

* fix grumble

* make submitter part of ImportContext

* verify using next validators set + tests

* fix nostd compilation

* add sub-tx-mode argument

* support sub-tx-mode

* impl ValidateUnsigned for Runtime

* do not submit too much transactions to the pool

* cargo fmt

* fix bad merge

* revert license fix

* Update modules/ethereum/src/lib.rs

Co-Authored-By: Hernando Castano <HCastano@users.noreply.github.com>

* Update modules/ethereum/src/verification.rs

Co-Authored-By: Hernando Castano <HCastano@users.noreply.github.com>

* updated comment

* validate receipts before accepting unsigned tx to pool

* cargo fmt

* fix comment

* fix grumbles

* Update modules/ethereum/src/verification.rs

Co-Authored-By: Hernando Castano <HCastano@users.noreply.github.com>

* cargo fmt --all

* struct ChangeToEnact

* updated doc

* fix doc

* add docs to the code method

* simplify fn ancestry

* finish incomplete docs

* Update modules/ethereum/src/lib.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update modules/ethereum/src/lib.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* return err from unsigned_import_header

* get header once

* Update relays/ethereum/src/ethereum_sync.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* fix

* UnsignedTooFarInTheFuture -> Custom(err.code())

* updated ImportContext::last_signal_block

* cargo fmt --all

* rename runtime calls

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-04-07 15:53:59 +03:00
committed by Bastian Köcher
parent b055027161
commit c6c46462ab
12 changed files with 1043 additions and 211 deletions
+12 -1
View File
@@ -23,11 +23,22 @@ args:
value_name: SUB_PORT
help: Connect to Substrate node at given port.
takes_value: true
- sub-tx-mode:
long: sub-tx-mode
value_name: MODE
help: Submit headers using signed (default) or unsigned transactions. Third mode - backup - submits signed transactions only when we believe that sync has stalled.
takes_value: true
possible_values:
- signed
- unsigned
- backup
- sub-signer:
long: sub-signer
value_name: SUB_SIGNER
help: The SURI of secret key to use when transactions are submitted to the Substrate node.
takes_value: true
- sub-signer-password:
long: sub-signer-password
value_name: SUB_SIGNER_PASSWORD
help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node.
help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node.
takes_value: true
+36 -9
View File
@@ -15,7 +15,7 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_headers::QueuedHeaders;
use crate::ethereum_sync_loop::EthereumSyncParams;
use crate::ethereum_sync_loop::{EthereumSyncParams, SubstrateTransactionMode};
use crate::ethereum_types::{HeaderId, HeaderStatus, QueuedHeader};
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
use codec::Encode;
@@ -95,7 +95,12 @@ impl HeadersSync {
}
/// Select headers that need to be submitted to the Substrate node.
pub fn select_headers_to_submit(&self) -> Option<Vec<&QueuedHeader>> {
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader>> {
// if we operate in backup mode, we only submit headers when sync has stalled
if self.params.sub_tx_mode == SubstrateTransactionMode::Backup && !stalled {
return None;
}
let headers_in_submit_status = self.headers.headers_in_status(HeaderStatus::Submitted);
let headers_to_submit_count = self
.params
@@ -224,7 +229,7 @@ mod tests {
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101)));
eth_sync.headers.maybe_receipts_response(&id(101), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(101)));
assert_eq!(eth_sync.select_headers_to_submit(), Some(vec![&header(101)]));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)]));
// and header #102 is ready to be downloaded
assert_eq!(eth_sync.select_new_header_to_download(), Some(102));
@@ -238,13 +243,13 @@ mod tests {
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(102)));
eth_sync.headers.maybe_receipts_response(&id(102), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(102)));
assert_eq!(eth_sync.select_headers_to_submit(), None);
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// substrate reports that it has imported block #101
eth_sync.substrate_best_header_response(id(101));
// and we are ready to submit #102
assert_eq!(eth_sync.select_headers_to_submit(), Some(vec![&header(102)]));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)]));
eth_sync.headers.headers_submitted(vec![id(102)]);
// substrate reports that it has imported block #102
@@ -269,7 +274,7 @@ mod tests {
eth_sync.headers.header_response(header(101).header().clone());
// we can't submit header #101, because its parent status is unknown
assert_eq!(eth_sync.select_headers_to_submit(), None);
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// instead we are trying to determine status of its parent (#100)
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeOrphan), Some(&header(101)));
@@ -282,7 +287,7 @@ mod tests {
eth_sync.headers.header_response(header(100).header().clone());
// we can't submit header #100, because its parent status is unknown
assert_eq!(eth_sync.select_headers_to_submit(), None);
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// instead we are trying to determine status of its parent (#99)
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeOrphan), Some(&header(100)));
@@ -293,13 +298,13 @@ mod tests {
// and we are ready to submit #100
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(100)));
eth_sync.headers.maybe_receipts_response(&id(100), false);
assert_eq!(eth_sync.select_headers_to_submit(), Some(vec![&header(100)]));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(100)]));
eth_sync.headers.headers_submitted(vec![id(100)]);
// and we are ready to submit #101
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101)));
eth_sync.headers.maybe_receipts_response(&id(101), false);
assert_eq!(eth_sync.select_headers_to_submit(), Some(vec![&header(101)]));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)]));
eth_sync.headers.headers_submitted(vec![id(101)]);
}
@@ -310,4 +315,26 @@ mod tests {
eth_sync.substrate_best_header_response(id(100));
assert_eq!(eth_sync.headers.prune_border(), 50);
}
#[test]
fn only_submitting_headers_in_backup_mode_when_stalled() {
let mut eth_sync = HeadersSync::new(Default::default());
eth_sync.params.sub_tx_mode = SubstrateTransactionMode::Backup;
// ethereum reports best header #102
eth_sync.ethereum_best_header_number_response(102);
// substrate reports that it is at block #100
eth_sync.substrate_best_header_response(id(100));
// block #101 is downloaded first
eth_sync.headers.header_response(header(101).header().clone());
eth_sync.headers.maybe_receipts_response(&id(101), false);
// ensure that headers are not submitted when sync is not stalled
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// ensure that headers are not submitted when sync is stalled
assert_eq!(eth_sync.select_headers_to_submit(true), Some(vec![&header(101)]));
}
}
@@ -37,6 +37,9 @@ const SUBSTRATE_TICK_INTERVAL_MS: u64 = 5_000;
/// the subscriber will receive every best header (2) reorg won't always lead to sync
/// stall and restart is a heavy operation (we forget all in-memory headers).
const STALL_SYNC_TIMEOUT_MS: u64 = 30_000;
/// Delay (in milliseconds) after we have seen update of best Ethereum header in Substrate,
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000;
/// Delay (in milliseconds) after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY_MS: u64 = 10_000;
@@ -57,6 +60,8 @@ pub struct EthereumSyncParams {
pub sub_host: String,
/// Substrate RPC port.
pub sub_port: u16,
/// Substrate transactions submission mode.
pub sub_tx_mode: SubstrateTransactionMode,
/// Substrate transactions signer.
pub sub_signer: sp_core::sr25519::Pair,
/// Maximal number of ethereum headers to pre-download.
@@ -72,6 +77,18 @@ pub struct EthereumSyncParams {
pub prune_depth: u64,
}
/// Substrate transaction mode.
#[derive(Debug, PartialEq)]
pub enum SubstrateTransactionMode {
/// Submit eth headers using signed substrate transactions.
Signed,
/// Submit eth headers using unsigned substrate transactions.
Unsigned,
/// Submit eth headers using signed substrate transactions, but only when we
/// believe that sync has stalled.
Backup,
}
impl std::fmt::Debug for EthereumSyncParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("EthereumSyncParams")
@@ -79,6 +96,7 @@ impl std::fmt::Debug for EthereumSyncParams {
.field("eth_port", &self.eth_port)
.field("sub_host", &self.sub_port)
.field("sub_port", &self.sub_port)
.field("sub_tx_mode", &self.sub_tx_mode)
.field("max_future_headers_to_download", &self.max_future_headers_to_download)
.field("max_headers_in_submitted_status", &self.max_headers_in_submitted_status)
.field("max_headers_in_single_submit", &self.max_headers_in_single_submit)
@@ -98,6 +116,7 @@ impl Default for EthereumSyncParams {
eth_port: 8545,
sub_host: "localhost".into(),
sub_port: 9933,
sub_tx_mode: SubstrateTransactionMode::Signed,
sub_signer: sp_keyring::AccountKeyring::Alice.pair(),
max_future_headers_to_download: 128,
max_headers_in_submitted_status: 128,
@@ -112,6 +131,10 @@ impl Default for EthereumSyncParams {
pub fn run(params: EthereumSyncParams) {
let mut local_pool = futures::executor::LocalPool::new();
let mut progress_context = (std::time::Instant::now(), None, None);
let sign_sub_transactions = match params.sub_tx_mode {
SubstrateTransactionMode::Signed | SubstrateTransactionMode::Backup => true,
SubstrateTransactionMode::Unsigned => false,
};
local_pool.run_until(async move {
let eth_uri = format!("http://{}:{}", params.eth_host, params.eth_port);
@@ -120,6 +143,7 @@ pub fn run(params: EthereumSyncParams) {
let mut eth_sync = crate::ethereum_sync::HeadersSync::new(params);
let mut stall_countdown = None;
let mut last_update_time = std::time::Instant::now();
let mut eth_maybe_client = None;
let mut eth_best_block_number_required = false;
@@ -220,6 +244,9 @@ pub fn run(params: EthereumSyncParams) {
sub_best_block,
|sub_best_block| {
let head_updated = eth_sync.substrate_best_header_response(sub_best_block);
if head_updated {
last_update_time = std::time::Instant::now();
}
match head_updated {
// IF head is updated AND there are still our transactions:
// => restart stall countdown timer
@@ -336,7 +363,9 @@ pub fn run(params: EthereumSyncParams) {
sub_existence_status_future
.set(substrate_client::ethereum_header_known(sub_client, parent_id).fuse());
} else if let Some(headers) = eth_sync.select_headers_to_submit() {
} else if let Some(headers) = eth_sync.select_headers_to_submit(
last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS),
) {
let ids = match headers.len() {
1 => format!("{:?}", headers[0].id()),
2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()),
@@ -350,7 +379,9 @@ pub fn run(params: EthereumSyncParams) {
);
let headers = headers.into_iter().cloned().collect();
sub_submit_header_future.set(substrate_client::submit_ethereum_headers(sub_client, headers).fuse());
sub_submit_header_future.set(
substrate_client::submit_ethereum_headers(sub_client, headers, sign_sub_transactions).fuse(),
);
// remember that we have submitted some headers
if stall_countdown.is_none() {
+13
View File
@@ -99,5 +99,18 @@ fn ethereum_sync_params() -> Result<ethereum_sync_loop::EthereumSyncParams, Stri
sp_core::sr25519::Pair::from_string(sub_signer, sub_signer_password).map_err(|e| format!("{:?}", e))?;
}
match matches.value_of("sub-tx-mode") {
Some("signed") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed,
Some("unsigned") => {
eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Unsigned;
// tx pool won't accept too much unsigned transactions
eth_sync_params.max_headers_in_submitted_status = 10;
}
Some("backup") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Backup,
Some(mode) => return Err(format!("Invalid sub-tx-mode: {}", mode)),
None => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed,
}
Ok(eth_sync_params)
}
+76 -17
View File
@@ -132,7 +132,19 @@ pub async fn ethereum_header_known(
pub async fn submit_ethereum_headers(
client: Client,
headers: Vec<QueuedEthereumHeader>,
) -> (Client, Result<(TransactionHash, Vec<EthereumHeaderId>), Error>) {
sign_transactions: bool,
) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) {
match sign_transactions {
true => submit_signed_ethereum_headers(client, headers).await,
false => submit_unsigned_ethereum_headers(client, headers).await,
}
}
/// Submits signed Ethereum header to Substrate runtime.
pub async fn submit_signed_ethereum_headers(
client: Client,
headers: Vec<QueuedEthereumHeader>,
) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) {
let ids = headers.iter().map(|header| header.id()).collect();
let (client, genesis_hash) = match client.genesis_hash {
Some(genesis_hash) => (client, genesis_hash),
@@ -152,7 +164,9 @@ pub async fn submit_ethereum_headers(
Ok(nonce) => nonce,
Err(err) => return (client, Err(err)),
};
let transaction = create_submit_transaction(headers, &client.signer, nonce, genesis_hash);
let transaction = create_signed_submit_transaction(headers, &client.signer, nonce, genesis_hash);
let encoded_transaction = transaction.encode();
let (client, transaction_hash) = call_rpc(
client,
@@ -160,7 +174,39 @@ pub async fn submit_ethereum_headers(
Params::Array(vec![to_value(Bytes(encoded_transaction)).unwrap()]),
)
.await;
(client, transaction_hash.map(|transaction_hash| (transaction_hash, ids)))
(
client,
transaction_hash.map(|transaction_hash| (vec![transaction_hash], ids)),
)
}
/// Submits unsigned Ethereum header to Substrate runtime.
pub async fn submit_unsigned_ethereum_headers(
mut client: Client,
headers: Vec<QueuedEthereumHeader>,
) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) {
let ids = headers.iter().map(|header| header.id()).collect();
let mut transactions_hashes = Vec::new();
for header in headers {
let transaction = create_unsigned_submit_transaction(header);
let encoded_transaction = transaction.encode();
let (used_client, transaction_hash) = call_rpc(
client,
"author_submitExtrinsic",
Params::Array(vec![to_value(Bytes(encoded_transaction)).unwrap()]),
)
.await;
client = used_client;
transactions_hashes.push(match transaction_hash {
Ok(transaction_hash) => transaction_hash,
Err(error) => return (client, Err(error)),
});
}
(client, Ok((transactions_hashes, ids)))
}
/// Get Substrate block hash by its number.
@@ -236,25 +282,26 @@ async fn call_rpc_u64(mut client: Client, method: &'static str, params: Params)
(client, result)
}
/// Create Substrate transaction for submitting Ethereum header.
fn create_submit_transaction(
/// Create signed Substrate transaction for submitting Ethereum headers.
fn create_signed_submit_transaction(
headers: Vec<QueuedEthereumHeader>,
signer: &sp_core::sr25519::Pair,
index: node_primitives::Index,
genesis_hash: H256,
) -> bridge_node_runtime::UncheckedExtrinsic {
let function = bridge_node_runtime::Call::BridgeEthPoA(bridge_node_runtime::BridgeEthPoACall::import_headers(
headers
.into_iter()
.map(|header| {
let (header, receipts) = header.extract();
(
into_substrate_ethereum_header(&header),
into_substrate_ethereum_receipts(&receipts),
)
})
.collect(),
));
let function =
bridge_node_runtime::Call::BridgeEthPoA(bridge_node_runtime::BridgeEthPoACall::import_signed_headers(
headers
.into_iter()
.map(|header| {
let (header, receipts) = header.extract();
(
into_substrate_ethereum_header(&header),
into_substrate_ethereum_receipts(&receipts),
)
})
.collect(),
));
let extra = |i: node_primitives::Index, f: node_primitives::Balance| {
(
@@ -284,3 +331,15 @@ fn create_submit_transaction(
bridge_node_runtime::UncheckedExtrinsic::new_signed(function, signer.into_account().into(), signature.into(), extra)
}
/// Create unsigned Substrate transaction for submitting Ethereum header.
fn create_unsigned_submit_transaction(header: QueuedEthereumHeader) -> bridge_node_runtime::UncheckedExtrinsic {
let (header, receipts) = header.extract();
let function =
bridge_node_runtime::Call::BridgeEthPoA(bridge_node_runtime::BridgeEthPoACall::import_unsigned_header(
into_substrate_ethereum_header(&header),
into_substrate_ethereum_receipts(&receipts),
));
bridge_node_runtime::UncheckedExtrinsic::new_unsigned(function)
}