From 6550c5ff7f6549e64e5a515f04bae6fc44bdb644 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Tue, 13 Jan 2026 16:11:30 +0300 Subject: [PATCH] Add a `ReceiptRetryLayer` for providers --- crates/node/src/node_implementations/geth.rs | 63 +------- .../node_implementations/lighthouse_geth.rs | 83 ++-------- .../polkadot_omni_node.rs | 17 +- .../src/node_implementations/substrate.rs | 17 +- .../src/node_implementations/zombienet.rs | 19 +-- crates/node/src/provider_utils/mod.rs | 2 + crates/node/src/provider_utils/provider.rs | 86 ++-------- .../src/provider_utils/receipt_retry_layer.rs | 151 ++++++++++++++++++ 8 files changed, 209 insertions(+), 229 deletions(-) create mode 100644 crates/node/src/provider_utils/receipt_retry_layer.rs diff --git a/crates/node/src/node_implementations/geth.rs b/crates/node/src/node_implementations/geth.rs index fbf2433..0ed35da 100644 --- a/crates/node/src/node_implementations/geth.rs +++ b/crates/node/src/node_implementations/geth.rs @@ -35,7 +35,7 @@ use anyhow::Context as _; use futures::{FutureExt, Stream, StreamExt}; use revive_common::EVMVersion; use tokio::sync::OnceCell; -use tracing::{Instrument, error, instrument}; +use tracing::{error, instrument}; use revive_dt_common::{ fs::clear_directory, @@ -90,10 +90,8 @@ impl GethNode { const READY_MARKER: &str = "IPC endpoint opened"; const ERROR_MARKER: &str = "Fatal:"; - const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet"; - const RECEIPT_POLLING_DURATION: Duration = Duration::from_secs(5 * 60); const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60); pub fn new( @@ -341,62 +339,15 @@ impl EthereumNode for GethNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let provider = self - .provider() + self.provider() .await - .context("Failed to create provider for transaction submission")?; - - let pending_transaction = provider + .context("Failed to create provider for transaction submission")? .send_transaction(transaction) .await - .inspect_err( - |err| error!(%err, "Encountered an error when submitting the transaction"), - ) - .context("Failed to submit transaction to geth node")?; - let transaction_hash = *pending_transaction.tx_hash(); - - // The following is a fix for the "transaction indexing is in progress" error that we used - // to get. You can find more information on this in the following GH issue in geth - // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, - // before we can get the receipt of the transaction it needs to have been indexed by the - // node's indexer. Just because the transaction has been confirmed it doesn't mean that it - // has been indexed. When we call alloy's `get_receipt` it checks if the transaction was - // confirmed. If it has been, then it will call `eth_getTransactionReceipt` method which - // _might_ return the above error if the tx has not yet been indexed yet. So, we need to - // implement a retry mechanism for the receipt to keep retrying to get it until it - // eventually works, but we only do that if the error we get back is the "transaction - // indexing is in progress" error or if the receipt is None. - // - // Getting the transaction indexed and taking a receipt can take a long time especially when - // a lot of transactions are being submitted to the node. Thus, while initially we only - // allowed for 60 seconds of waiting with a 1 second delay in polling, we need to allow for - // a larger wait time. Therefore, in here we allow for 5 minutes of waiting with exponential - // backoff each time we attempt to get the receipt and find that it's not available. - poll( - Self::RECEIPT_POLLING_DURATION, - PollingWaitBehavior::Constant(Duration::from_millis(200)), - move || { - let provider = provider.clone(); - async move { - match provider.get_transaction_receipt(transaction_hash).await { - Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), - Ok(None) => Ok(ControlFlow::Continue(())), - Err(error) => { - let error_string = error.to_string(); - match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { - true => Ok(ControlFlow::Continue(())), - false => Err(error.into()), - } - } - } - } - }, - ) - .instrument(tracing::info_span!( - "Awaiting transaction receipt", - ?transaction_hash - )) - .await + .context("Encountered an error when submitting a transaction")? + .get_receipt() + .await + .context("Failed to get the receipt for the transaction") }) } diff --git a/crates/node/src/node_implementations/lighthouse_geth.rs b/crates/node/src/node_implementations/lighthouse_geth.rs index b59bd4a..664f9b2 100644 --- a/crates/node/src/node_implementations/lighthouse_geth.rs +++ b/crates/node/src/node_implementations/lighthouse_geth.rs @@ -48,7 +48,7 @@ use revive_common::EVMVersion; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; use tokio::sync::OnceCell; -use tracing::{Instrument, info, instrument}; +use tracing::{info, instrument}; use revive_dt_common::{ fs::clear_directory, @@ -116,10 +116,8 @@ impl LighthouseGethNode { const CONFIG_FILE_NAME: &str = "config.yaml"; - const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet"; - const RECEIPT_POLLING_DURATION: Duration = Duration::from_secs(5 * 60); const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60); const VALIDATOR_MNEMONIC: &str = "giant issue aisle success illegal bike spike question tent bar rely arctic volcano long crawl hungry vocal artwork sniff fantasy very lucky have athlete"; @@ -481,73 +479,6 @@ impl LighthouseGethNode { Ok(()) } - fn internal_execute_transaction<'a>( - transaction: TransactionRequest, - provider: FillProvider< - impl TxFiller + 'a, - impl Provider + Clone + 'a, - Ethereum, - >, - ) -> Pin> + 'a>> { - Box::pin(async move { - let pending_transaction = provider - .send_transaction(transaction) - .await - .inspect_err(|err| { - tracing::error!( - %err, - "Encountered an error when submitting the transaction" - ) - }) - .context("Failed to submit transaction to geth node")?; - let transaction_hash = *pending_transaction.tx_hash(); - - // The following is a fix for the "transaction indexing is in progress" error that we - // used to get. You can find more information on this in the following GH issue in geth - // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, - // before we can get the receipt of the transaction it needs to have been indexed by the - // node's indexer. Just because the transaction has been confirmed it doesn't mean that - // it has been indexed. When we call alloy's `get_receipt` it checks if the transaction - // was confirmed. If it has been, then it will call `eth_getTransactionReceipt` method - // which _might_ return the above error if the tx has not yet been indexed yet. So, we - // need to implement a retry mechanism for the receipt to keep retrying to get it until - // it eventually works, but we only do that if the error we get back is the "transaction - // indexing is in progress" error or if the receipt is None. - // - // Getting the transaction indexed and taking a receipt can take a long time especially - // when a lot of transactions are being submitted to the node. Thus, while initially we - // only allowed for 60 seconds of waiting with a 1 second delay in polling, we need to - // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting - // with exponential backoff each time we attempt to get the receipt and find that it's - // not available. - poll( - Self::RECEIPT_POLLING_DURATION, - PollingWaitBehavior::Constant(Duration::from_millis(500)), - move || { - let provider = provider.clone(); - async move { - match provider.get_transaction_receipt(transaction_hash).await { - Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), - Ok(None) => Ok(ControlFlow::Continue(())), - Err(error) => { - let error_string = error.to_string(); - match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { - true => Ok(ControlFlow::Continue(())), - false => Err(error.into()), - } - } - } - } - }, - ) - .instrument(tracing::info_span!( - "Awaiting transaction receipt", - ?transaction_hash - )) - .await - }) - } - pub fn node_genesis(mut genesis: Genesis, wallet: &EthereumWallet) -> Genesis { for signer_address in NetworkWallet::::signer_addresses(&wallet) { genesis @@ -626,11 +557,15 @@ impl EthereumNode for LighthouseGethNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let provider = self - .http_provider() + self.provider() .await - .context("Failed to create provider for transaction execution")?; - Self::internal_execute_transaction(transaction, provider).await + .context("Failed to create provider for transaction submission")? + .send_transaction(transaction) + .await + .context("Encountered an error when submitting a transaction")? + .get_receipt() + .await + .context("Failed to get the receipt for the transaction") }) } diff --git a/crates/node/src/node_implementations/polkadot_omni_node.rs b/crates/node/src/node_implementations/polkadot_omni_node.rs index def6e2e..e3a17e3 100644 --- a/crates/node/src/node_implementations/polkadot_omni_node.rs +++ b/crates/node/src/node_implementations/polkadot_omni_node.rs @@ -50,10 +50,7 @@ use crate::{ Node, constants::INITIAL_BALANCE, helpers::{Process, ProcessReadinessWaitBehavior}, - provider_utils::{ - ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider, - execute_transaction, - }, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -464,11 +461,15 @@ impl EthereumNode for PolkadotOmnichainNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let provider = self - .provider() + self.provider() .await - .context("Failed to create the provider")?; - execute_transaction(provider, transaction).await + .context("Failed to create provider for transaction submission")? + .send_transaction(transaction) + .await + .context("Encountered an error when submitting a transaction")? + .get_receipt() + .await + .context("Failed to get the receipt for the transaction") }) } diff --git a/crates/node/src/node_implementations/substrate.rs b/crates/node/src/node_implementations/substrate.rs index f29a706..a25ea80 100644 --- a/crates/node/src/node_implementations/substrate.rs +++ b/crates/node/src/node_implementations/substrate.rs @@ -49,10 +49,7 @@ use crate::{ Node, constants::INITIAL_BALANCE, helpers::{Process, ProcessReadinessWaitBehavior}, - provider_utils::{ - ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider, - execute_transaction, - }, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -434,11 +431,15 @@ impl EthereumNode for SubstrateNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let provider = self - .provider() + self.provider() .await - .context("Failed to create the provider")?; - execute_transaction(provider, transaction).await + .context("Failed to create provider for transaction submission")? + .send_transaction(transaction) + .await + .context("Encountered an error when submitting a transaction")? + .get_receipt() + .await + .context("Failed to get the receipt for the transaction") }) } diff --git a/crates/node/src/node_implementations/zombienet.rs b/crates/node/src/node_implementations/zombienet.rs index 3d1ed3e..ace8469 100644 --- a/crates/node/src/node_implementations/zombienet.rs +++ b/crates/node/src/node_implementations/zombienet.rs @@ -76,10 +76,7 @@ use crate::{ Node, constants::INITIAL_BALANCE, helpers::{Process, ProcessReadinessWaitBehavior}, - provider_utils::{ - ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider, - execute_transaction, - }, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -433,14 +430,18 @@ impl EthereumNode for ZombienetNode { fn execute_transaction( &self, - transaction: alloy::rpc::types::TransactionRequest, + transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let provider = self - .provider() + self.provider() .await - .context("Failed to create the provider")?; - execute_transaction(provider, transaction).await + .context("Failed to create provider for transaction submission")? + .send_transaction(transaction) + .await + .context("Encountered an error when submitting a transaction")? + .get_receipt() + .await + .context("Failed to get the receipt for the transaction") }) } diff --git a/crates/node/src/provider_utils/mod.rs b/crates/node/src/provider_utils/mod.rs index c556a71..2610d62 100644 --- a/crates/node/src/provider_utils/mod.rs +++ b/crates/node/src/provider_utils/mod.rs @@ -1,7 +1,9 @@ mod concurrency_limiter; mod fallback_gas_filler; mod provider; +mod receipt_retry_layer; pub use concurrency_limiter::*; pub use fallback_gas_filler::*; pub use provider::*; +pub use receipt_retry_layer::*; diff --git a/crates/node/src/provider_utils/provider.rs b/crates/node/src/provider_utils/provider.rs index f10b3b6..b08e81e 100644 --- a/crates/node/src/provider_utils/provider.rs +++ b/crates/node/src/provider_utils/provider.rs @@ -1,25 +1,25 @@ -use std::{ops::ControlFlow, sync::LazyLock, time::Duration}; +use std::{sync::LazyLock, time::Duration}; use alloy::{ - network::{Ethereum, Network, NetworkWallet, TransactionBuilder4844}, + network::{Network, NetworkWallet, TransactionBuilder4844}, providers::{ - Identity, PendingTransactionBuilder, Provider, ProviderBuilder, RootProvider, + Identity, ProviderBuilder, RootProvider, fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller}, }, rpc::client::ClientBuilder, }; use anyhow::{Context, Result}; -use revive_dt_common::futures::{PollingWaitBehavior, poll}; -use tracing::{Instrument, debug, info, info_span}; -use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; +use crate::provider_utils::{ + ConcurrencyLimiterLayer, FallbackGasFiller, ReceiptRetryLayer, ReceiptRetryProvider, +}; pub type ConcreteProvider = FillProvider< JoinFill< JoinFill, ChainIdFiller>, NonceFiller>, WalletFiller, >, - RootProvider, + ReceiptRetryProvider, N>, N, >; @@ -55,6 +55,11 @@ where let provider = ProviderBuilder::new() .disable_recommended_fillers() .network::() + .layer( + ReceiptRetryLayer::default() + .with_polling_duration(Duration::from_secs(90)) + .with_polling_interval(Duration::from_millis(500)), + ) .filler(fallback_gas_filler) .filler(chain_id_filler) .filler(nonce_filler) @@ -63,70 +68,3 @@ where Ok(provider) } - -pub async fn execute_transaction( - provider: ConcreteProvider, - transaction: N::TransactionRequest, -) -> Result -where - N: Network< - TransactionRequest: TransactionBuilder4844, - TxEnvelope = ::TxEnvelope, - >, - W: NetworkWallet, - Identity: TxFiller, - FallbackGasFiller: TxFiller, - ChainIdFiller: TxFiller, - NonceFiller: TxFiller, - WalletFiller: TxFiller, -{ - let sendable_transaction = provider - .fill(transaction) - .await - .context("Failed to fill transaction")?; - - let transaction_envelope = sendable_transaction - .try_into_envelope() - .context("Failed to convert transaction into an envelope")?; - let tx_hash = *transaction_envelope.tx_hash(); - - let mut pending_transaction = match provider.send_tx_envelope(transaction_envelope).await { - Ok(pending_transaction) => pending_transaction, - Err(error) => { - let error_string = error.to_string(); - - if error_string.contains("Transaction Already Imported") { - PendingTransactionBuilder::::new(provider.root().clone(), tx_hash) - } else { - return Err(error).context(format!("Failed to submit transaction {tx_hash}")); - } - } - }; - debug!(%tx_hash, "Submitted Transaction"); - - pending_transaction.set_timeout(Some(Duration::from_secs(120))); - let tx_hash = pending_transaction.watch().await.context(format!( - "Transaction inclusion watching timeout for {tx_hash}" - ))?; - - poll( - Duration::from_secs(60), - PollingWaitBehavior::Constant(Duration::from_secs(3)), - || { - let provider = provider.clone(); - - async move { - match provider.get_transaction_receipt(tx_hash).await { - Ok(Some(receipt)) => { - info!("Found the transaction receipt"); - Ok(ControlFlow::Break(receipt)) - } - _ => Ok(ControlFlow::Continue(())), - } - } - }, - ) - .instrument(info_span!("Polling for receipt", %tx_hash)) - .await - .context(format!("Polling for receipt failed for {tx_hash}")) -} diff --git a/crates/node/src/provider_utils/receipt_retry_layer.rs b/crates/node/src/provider_utils/receipt_retry_layer.rs new file mode 100644 index 0000000..e521ae9 --- /dev/null +++ b/crates/node/src/provider_utils/receipt_retry_layer.rs @@ -0,0 +1,151 @@ +use std::{marker::PhantomData, time::Duration}; + +use alloy::{ + network::Network, + primitives::TxHash, + providers::{Provider, ProviderCall, ProviderLayer, RootProvider}, + transports::{RpcError, TransportErrorKind}, +}; +use tokio::time::{interval, timeout}; + +/// A layer that allows for automatic retries for getting the receipt. +/// +/// There are certain cases where getting the receipt of a committed transaction might fail. In Geth +/// this can happen if the transaction has been committed to the ledger but has not been indexed, in +/// the substrate and revive stack it can also happen for other reasons. +/// +/// Therefore, just because the first attempt to get the receipt (after transaction confirmation) +/// has failed it doesn't mean that it will continue to fail. This layer can be added to any alloy +/// provider to allow the provider to retry getting the receipt for some period of time before it +/// considers that a timeout. It attempts to poll for the receipt for the `polling_duration` with an +/// interval of `polling_interval` between each poll. If by the end of the `polling_duration` it was +/// not able to get the receipt successfully then this is considered to be a timeout. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ReceiptRetryLayer { + /// The amount of time to keep polling for the receipt before considering it a timeout. + polling_duration: Duration, + + /// The interval of time to wait between each poll for the receipt. + polling_interval: Duration, +} + +impl ReceiptRetryLayer { + pub fn new(polling_duration: Duration, polling_interval: Duration) -> Self { + Self { + polling_duration, + polling_interval, + } + } + + pub fn with_polling_duration(mut self, polling_duration: Duration) -> Self { + self.polling_duration = polling_duration; + self + } + + pub fn with_polling_interval(mut self, polling_interval: Duration) -> Self { + self.polling_interval = polling_interval; + self + } +} + +impl Default for ReceiptRetryLayer { + fn default() -> Self { + Self { + polling_duration: Duration::from_secs(90), + polling_interval: Duration::from_millis(500), + } + } +} + +impl ProviderLayer for ReceiptRetryLayer +where + P: Provider, + N: Network, +{ + type Provider = ReceiptRetryProvider; + + fn layer(&self, inner: P) -> Self::Provider { + ReceiptRetryProvider::new(self.polling_duration, self.polling_interval, inner) + } +} + +#[derive(Debug, Clone)] +pub struct ReceiptRetryProvider { + /// The amount of time to keep polling for the receipt before considering it a timeout. + polling_duration: Duration, + + /// The interval of time to wait between each poll for the receipt. + polling_interval: Duration, + + /// Inner provider. + inner: P, + + /// Phantom data + phantom: PhantomData, +} + +impl ReceiptRetryProvider +where + P: Provider, + N: Network, +{ + /// Instantiate a new cache provider. + pub const fn new(polling_duration: Duration, polling_interval: Duration, inner: P) -> Self { + Self { + inner, + polling_duration, + polling_interval, + phantom: PhantomData, + } + } +} + +impl Provider for ReceiptRetryProvider +where + P: Provider, + N: Network, +{ + #[inline(always)] + fn root(&self) -> &RootProvider { + self.inner.root() + } + + fn get_transaction_receipt( + &self, + hash: TxHash, + ) -> ProviderCall<(TxHash,), Option<::ReceiptResponse>> { + let client = self.inner.weak_client(); + let polling_duration = self.polling_duration; + let polling_interval = self.polling_interval; + + ProviderCall::BoxedFuture(Box::pin(async move { + let client = client + .upgrade() + .ok_or_else(|| TransportErrorKind::custom_str("RPC client dropped"))?; + + let receipt = timeout(polling_duration, async move { + let mut interval = interval(polling_interval); + + loop { + let result = client + .request::<(TxHash,), Option<::ReceiptResponse>>( + "eth_getTransactionReceipt", + (hash,), + ) + .await; + if let Ok(Some(receipt)) = result { + return receipt; + } + + interval.tick().await; + } + }) + .await + .map_err(|_| { + RpcError::local_usage_str("Timeout when waiting for transaction receipt") + })?; + + Ok(Some(receipt)) + })) + } +}