mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 02:51:01 +00:00
pause relays(s) when node is syncing (#605)
This commit is contained in:
committed by
Bastian Köcher
parent
8ee90afae6
commit
8953967d92
@@ -16,8 +16,8 @@
|
|||||||
|
|
||||||
use crate::rpc::Ethereum;
|
use crate::rpc::Ethereum;
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SignedRawTx, Transaction, TransactionHash,
|
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SignedRawTx, SyncState, Transaction,
|
||||||
H256, U256,
|
TransactionHash, H256, U256,
|
||||||
};
|
};
|
||||||
use crate::{ConnectionParams, Error, Result};
|
use crate::{ConnectionParams, Error, Result};
|
||||||
|
|
||||||
@@ -25,6 +25,9 @@ use jsonrpsee::raw::RawClient;
|
|||||||
use jsonrpsee::transport::http::HttpTransportClient;
|
use jsonrpsee::transport::http::HttpTransportClient;
|
||||||
use jsonrpsee::Client as RpcClient;
|
use jsonrpsee::Client as RpcClient;
|
||||||
|
|
||||||
|
/// Number of headers missing from the Ethereum node for us to consider node not synced.
|
||||||
|
const MAJOR_SYNC_BLOCKS: u64 = 5;
|
||||||
|
|
||||||
/// The client used to interact with an Ethereum node through RPC.
|
/// The client used to interact with an Ethereum node through RPC.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
@@ -53,6 +56,23 @@ impl Client {
|
|||||||
pub fn reconnect(&mut self) {
|
pub fn reconnect(&mut self) {
|
||||||
self.client = Self::build_client(&self.params);
|
self.client = Self::build_client(&self.params);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
/// Returns true if client is connected to at least one peer and is in synced state.
|
||||||
|
pub async fn ensure_synced(&self) -> Result<()> {
|
||||||
|
match Ethereum::syncing(&self.client).await? {
|
||||||
|
SyncState::NotSyncing => Ok(()),
|
||||||
|
SyncState::Syncing(syncing) => {
|
||||||
|
let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block);
|
||||||
|
if missing_headers > MAJOR_SYNC_BLOCKS.into() {
|
||||||
|
return Err(Error::ClientNotSynced(missing_headers));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Estimate gas usage for the given call.
|
/// Estimate gas usage for the given call.
|
||||||
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
|
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
|
||||||
|
|||||||
@@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
//! Ethereum node RPC errors.
|
//! Ethereum node RPC errors.
|
||||||
|
|
||||||
|
use crate::types::U256;
|
||||||
|
|
||||||
use jsonrpsee::client::RequestError;
|
use jsonrpsee::client::RequestError;
|
||||||
use relay_utils::MaybeConnectionError;
|
use relay_utils::MaybeConnectionError;
|
||||||
|
|
||||||
@@ -40,6 +42,9 @@ pub enum Error {
|
|||||||
InvalidSubstrateBlockNumber,
|
InvalidSubstrateBlockNumber,
|
||||||
/// An invalid index has been received from an Ethereum node.
|
/// An invalid index has been received from an Ethereum node.
|
||||||
InvalidIncompleteIndex,
|
InvalidIncompleteIndex,
|
||||||
|
/// The client we're connected to is not synced, so we can't rely on its state. Contains
|
||||||
|
/// number of unsynced headers.
|
||||||
|
ClientNotSynced(U256),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<RequestError> for Error {
|
impl From<RequestError> for Error {
|
||||||
@@ -50,7 +55,11 @@ impl From<RequestError> for Error {
|
|||||||
|
|
||||||
impl MaybeConnectionError for Error {
|
impl MaybeConnectionError for Error {
|
||||||
fn is_connection_error(&self) -> bool {
|
fn is_connection_error(&self) -> bool {
|
||||||
matches!(*self, Error::Request(RequestError::TransportError(_)))
|
matches!(
|
||||||
|
*self,
|
||||||
|
Error::Request(RequestError::TransportError(_))
|
||||||
|
| Error::ClientNotSynced(_),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,6 +75,9 @@ impl ToString for Error {
|
|||||||
Self::IncompleteTransaction => "Incomplete Ethereum Transaction (missing required field - raw)".to_string(),
|
Self::IncompleteTransaction => "Incomplete Ethereum Transaction (missing required field - raw)".to_string(),
|
||||||
Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node".to_string(),
|
Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node".to_string(),
|
||||||
Self::InvalidIncompleteIndex => "Received an invalid incomplete index from Ethereum Node".to_string(),
|
Self::InvalidIncompleteIndex => "Received an invalid incomplete index from Ethereum Node".to_string(),
|
||||||
|
Self::ClientNotSynced(missing_headers) => {
|
||||||
|
format!("Ethereum client is not synced: syncing {} headers", missing_headers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,11 +22,14 @@
|
|||||||
#![allow(unused_variables)]
|
#![allow(unused_variables)]
|
||||||
|
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, Transaction, TransactionHash, H256, U256, U64,
|
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SyncState, Transaction, TransactionHash,
|
||||||
|
H256, U256, U64,
|
||||||
};
|
};
|
||||||
|
|
||||||
jsonrpsee::rpc_api! {
|
jsonrpsee::rpc_api! {
|
||||||
pub(crate) Ethereum {
|
pub(crate) Ethereum {
|
||||||
|
#[rpc(method = "eth_syncing", positional_params)]
|
||||||
|
fn syncing() -> SyncState;
|
||||||
#[rpc(method = "eth_estimateGas", positional_params)]
|
#[rpc(method = "eth_estimateGas", positional_params)]
|
||||||
fn estimate_gas(call_request: CallRequest) -> U256;
|
fn estimate_gas(call_request: CallRequest) -> U256;
|
||||||
#[rpc(method = "eth_blockNumber", positional_params)]
|
#[rpc(method = "eth_blockNumber", positional_params)]
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
use headers_relay::sync_types::SourceHeader;
|
use headers_relay::sync_types::SourceHeader;
|
||||||
|
|
||||||
pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};
|
pub use web3::types::{Address, Bytes, CallRequest, SyncState, H256, U128, U256, U64};
|
||||||
|
|
||||||
/// When header is just received from the Ethereum node, we check that it has
|
/// When header is just received from the Ethereum node, we check that it has
|
||||||
/// both number and hash fields filled.
|
/// both number and hash fields filled.
|
||||||
|
|||||||
@@ -248,6 +248,10 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn best_finalized_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
|
async fn best_finalized_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
|
||||||
|
// we can't continue to relay exchange proofs if Substrate node is out of sync, because
|
||||||
|
// it may have already received (some of) proofs that we're going to relay
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
self.client.best_ethereum_finalized_block().await
|
self.client.best_ethereum_finalized_block().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -130,6 +130,9 @@ impl RelayClient for EthereumHeadersSource {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
|
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
|
||||||
async fn best_block_number(&self) -> Result<u64, RpcError> {
|
async fn best_block_number(&self) -> Result<u64, RpcError> {
|
||||||
|
// we **CAN** continue to relay headers if Ethereum node is out of sync, because
|
||||||
|
// Substrate node may be missing headers that are already available at the Ethereum
|
||||||
|
|
||||||
self.client.best_block_number().await.map_err(Into::into)
|
self.client.best_block_number().await.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -204,6 +207,10 @@ impl RelayClient for SubstrateHeadersTarget {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
|
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
|
||||||
async fn best_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
|
async fn best_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
|
||||||
|
// we can't continue to relay headers if Substrate node is out of sync, because
|
||||||
|
// it may have already received (some of) headers that we're going to relay
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
self.client.best_ethereum_block().await
|
self.client.best_ethereum_block().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -131,6 +131,10 @@ impl RelayClient for EthereumHeadersTarget {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
|
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
|
||||||
async fn best_header_id(&self) -> Result<RialtoHeaderId, RpcError> {
|
async fn best_header_id(&self) -> Result<RialtoHeaderId, RpcError> {
|
||||||
|
// we can't continue to relay headers if Ethereum node is out of sync, because
|
||||||
|
// it may have already received (some of) headers that we're going to relay
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
self.client.best_substrate_block(self.contract).await
|
self.client.best_substrate_block(self.contract).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ relay-utils = { path = "../utils" }
|
|||||||
frame-support = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
frame-support = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
frame-system = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
frame-system = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
pallet-balances = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
pallet-balances = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
|
sc-rpc-api = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
sp-core = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
sp-core = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
sp-runtime = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
sp-runtime = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
sp-std = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
sp-std = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
|
||||||
|
|||||||
@@ -104,6 +104,17 @@ impl<C: Chain> Client<C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<C: Chain> Client<C> {
|
impl<C: Chain> Client<C> {
|
||||||
|
/// Returns true if client is connected to at least one peer and is in synced state.
|
||||||
|
pub async fn ensure_synced(&self) -> Result<()> {
|
||||||
|
let health = Substrate::<C, _, _>::system_health(&self.client).await?;
|
||||||
|
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
|
||||||
|
if is_synced {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(Error::ClientNotSynced(health))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Return hash of the genesis block.
|
/// Return hash of the genesis block.
|
||||||
pub fn genesis_hash(&self) -> &C::Hash {
|
pub fn genesis_hash(&self) -> &C::Hash {
|
||||||
&self.genesis_hash
|
&self.genesis_hash
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
use jsonrpsee::client::RequestError;
|
use jsonrpsee::client::RequestError;
|
||||||
use jsonrpsee::transport::ws::WsNewDnsError;
|
use jsonrpsee::transport::ws::WsNewDnsError;
|
||||||
use relay_utils::MaybeConnectionError;
|
use relay_utils::MaybeConnectionError;
|
||||||
|
use sc_rpc_api::system::Health;
|
||||||
|
|
||||||
/// Result type used by Substrate client.
|
/// Result type used by Substrate client.
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@@ -38,6 +39,8 @@ pub enum Error {
|
|||||||
UninitializedBridgePallet,
|
UninitializedBridgePallet,
|
||||||
/// Account does not exist on the chain.
|
/// Account does not exist on the chain.
|
||||||
AccountDoesNotExist,
|
AccountDoesNotExist,
|
||||||
|
/// The client we're connected to is not synced, so we can't rely on its state.
|
||||||
|
ClientNotSynced(Health),
|
||||||
/// Custom logic error.
|
/// Custom logic error.
|
||||||
Custom(String),
|
Custom(String),
|
||||||
}
|
}
|
||||||
@@ -56,7 +59,11 @@ impl From<RequestError> for Error {
|
|||||||
|
|
||||||
impl MaybeConnectionError for Error {
|
impl MaybeConnectionError for Error {
|
||||||
fn is_connection_error(&self) -> bool {
|
fn is_connection_error(&self) -> bool {
|
||||||
matches!(*self, Error::Request(RequestError::TransportError(_)))
|
matches!(
|
||||||
|
*self,
|
||||||
|
Error::Request(RequestError::TransportError(_))
|
||||||
|
| Error::ClientNotSynced(_)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,6 +81,7 @@ impl ToString for Error {
|
|||||||
Self::ResponseParseFailed(e) => e.what().to_string(),
|
Self::ResponseParseFailed(e) => e.what().to_string(),
|
||||||
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
|
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
|
||||||
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
|
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
|
||||||
|
Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health),
|
||||||
Self::Custom(e) => e.clone(),
|
Self::Custom(e) => e.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,6 +73,8 @@ where
|
|||||||
P::Header: SourceHeader<C::Hash, C::BlockNumber>,
|
P::Header: SourceHeader<C::Hash, C::BlockNumber>,
|
||||||
{
|
{
|
||||||
async fn best_block_number(&self) -> Result<P::Number, Error> {
|
async fn best_block_number(&self) -> Result<P::Number, Error> {
|
||||||
|
// we **CAN** continue to relay headers if source node is out of sync, because
|
||||||
|
// target node may be missing headers that are already available at the source
|
||||||
Ok(*self.client.best_header().await?.number())
|
Ok(*self.client.best_header().await?.number())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ use crate::chain::Chain;
|
|||||||
|
|
||||||
use bp_message_lane::{LaneId, MessageNonce};
|
use bp_message_lane::{LaneId, MessageNonce};
|
||||||
use bp_runtime::InstanceId;
|
use bp_runtime::InstanceId;
|
||||||
|
use sc_rpc_api::system::Health;
|
||||||
use sp_core::{
|
use sp_core::{
|
||||||
storage::{StorageData, StorageKey},
|
storage::{StorageData, StorageKey},
|
||||||
Bytes,
|
Bytes,
|
||||||
@@ -33,6 +34,8 @@ use sp_version::RuntimeVersion;
|
|||||||
|
|
||||||
jsonrpsee::rpc_api! {
|
jsonrpsee::rpc_api! {
|
||||||
pub(crate) Substrate<C: Chain> {
|
pub(crate) Substrate<C: Chain> {
|
||||||
|
#[rpc(method = "system_health", positional_params)]
|
||||||
|
fn system_health() -> Health;
|
||||||
#[rpc(method = "chain_getHeader", positional_params)]
|
#[rpc(method = "chain_getHeader", positional_params)]
|
||||||
fn chain_get_header(block_hash: Option<C::Hash>) -> C::Header;
|
fn chain_get_header(block_hash: Option<C::Hash>) -> C::Header;
|
||||||
#[rpc(method = "chain_getFinalizedHead", positional_params)]
|
#[rpc(method = "chain_getFinalizedHead", positional_params)]
|
||||||
|
|||||||
@@ -73,6 +73,10 @@ where
|
|||||||
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
|
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
|
||||||
{
|
{
|
||||||
async fn best_header_id(&self) -> Result<HeaderIdOf<P>, SubstrateError> {
|
async fn best_header_id(&self) -> Result<HeaderIdOf<P>, SubstrateError> {
|
||||||
|
// we can't continue to relay headers if target node is out of sync, because
|
||||||
|
// it may have already received (some of) headers that we're going to relay
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
let call = P::BEST_BLOCK_METHOD.into();
|
let call = P::BEST_BLOCK_METHOD.into();
|
||||||
let data = Bytes(Vec::new());
|
let data = Bytes(Vec::new());
|
||||||
|
|
||||||
|
|||||||
@@ -99,6 +99,10 @@ where
|
|||||||
P::TargetHeaderHash: Decode,
|
P::TargetHeaderHash: Decode,
|
||||||
{
|
{
|
||||||
async fn state(&self) -> Result<SourceClientState<P>, SubstrateError> {
|
async fn state(&self) -> Result<SourceClientState<P>, SubstrateError> {
|
||||||
|
// we can't continue to deliver confirmations if source node is out of sync, because
|
||||||
|
// it may have already received confirmations that we're going to deliver
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
read_client_state::<_, P::TargetHeaderHash, P::TargetHeaderNumber>(
|
read_client_state::<_, P::TargetHeaderHash, P::TargetHeaderNumber>(
|
||||||
&self.client,
|
&self.client,
|
||||||
P::BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE,
|
P::BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE,
|
||||||
|
|||||||
@@ -95,6 +95,10 @@ where
|
|||||||
P::SourceHeaderHash: Decode,
|
P::SourceHeaderHash: Decode,
|
||||||
{
|
{
|
||||||
async fn state(&self) -> Result<TargetClientState<P>, SubstrateError> {
|
async fn state(&self) -> Result<TargetClientState<P>, SubstrateError> {
|
||||||
|
// we can't continue to deliver messages if target node is out of sync, because
|
||||||
|
// it may have already received (some of) messages that we're going to deliver
|
||||||
|
self.client.ensure_synced().await?;
|
||||||
|
|
||||||
read_client_state::<_, P::SourceHeaderHash, P::SourceHeaderNumber>(
|
read_client_state::<_, P::SourceHeaderHash, P::SourceHeaderNumber>(
|
||||||
&self.client,
|
&self.client,
|
||||||
P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET,
|
P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET,
|
||||||
|
|||||||
Reference in New Issue
Block a user