From 8a05f8e6e87838b2732b37f0311b08a3b4a7e654 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Wed, 17 Sep 2025 06:01:13 +0300 Subject: [PATCH] Make the ethereum node trait object compatible --- crates/core/src/driver/mod.rs | 16 +- crates/core/src/lib.rs | 2 +- crates/node-interaction/src/lib.rs | 19 ++- crates/node/src/geth.rs | 258 ++++++++++++++++------------- crates/node/src/kitchensink.rs | 126 ++++++++------ 5 files changed, 231 insertions(+), 190 deletions(-) diff --git a/crates/core/src/driver/mod.rs b/crates/core/src/driver/mod.rs index b683bde..d66cdfa 100644 --- a/crates/core/src/driver/mod.rs +++ b/crates/core/src/driver/mod.rs @@ -8,7 +8,7 @@ use alloy::consensus::EMPTY_ROOT_HASH; use alloy::hex; use alloy::json_abi::JsonAbi; use alloy::network::{Ethereum, TransactionBuilder}; -use alloy::primitives::U256; +use alloy::primitives::{TxHash, U256}; use alloy::rpc::types::TransactionReceipt; use alloy::rpc::types::trace::geth::{ CallFrame, GethDebugBuiltInTracerType, GethDebugTracerConfig, GethDebugTracerType, @@ -124,14 +124,14 @@ where .await .context("Failed during transaction execution phase of input handling")?; let tracing_result = self - .handle_input_call_frame_tracing(&execution_receipt, node) + .handle_input_call_frame_tracing(execution_receipt.transaction_hash, node) .await .context("Failed during callframe tracing phase of input handling")?; self.handle_input_variable_assignment(input, &tracing_result) .context("Failed to assign variables from callframe output")?; let (_, (geth_trace, diff_mode)) = try_join!( self.handle_input_expectations(input, &execution_receipt, node, &tracing_result), - self.handle_input_diff(&execution_receipt, node) + self.handle_input_diff(execution_receipt.transaction_hash, node) ) .context("Failed while evaluating expectations and diffs in parallel")?; Ok((execution_receipt, geth_trace, diff_mode)) @@ -250,11 +250,11 @@ where #[instrument(level = "info", skip_all)] async fn handle_input_call_frame_tracing( &self, - execution_receipt: &TransactionReceipt, + tx_hash: TxHash, node: &T::Blockchain, ) -> anyhow::Result { node.trace_transaction( - execution_receipt, + tx_hash, GethDebugTracingOptions { tracer: Some(GethDebugTracerType::BuiltInTracer( GethDebugBuiltInTracerType::CallTracer, @@ -507,7 +507,7 @@ where #[instrument(level = "info", skip_all)] async fn handle_input_diff( &self, - execution_receipt: &TransactionReceipt, + tx_hash: TxHash, node: &T::Blockchain, ) -> anyhow::Result<(GethTrace, DiffMode)> { let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { @@ -517,11 +517,11 @@ where }); let trace = node - .trace_transaction(execution_receipt, trace_options) + .trace_transaction(tx_hash, trace_options) .await .context("Failed to obtain geth prestate tracer output")?; let diff = node - .state_diff(execution_receipt) + .state_diff(tx_hash) .await .context("Failed to obtain state diff for transaction")?; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b2a0db9..c6e450b 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -53,5 +53,5 @@ pub trait DynPlatform { /// Creates a new node for the platform by spawning a new thread, creating the node object, /// initializing it, spawning it, and waiting for it to start up. - fn new_node(&self) -> Box; + fn new_node(&self) -> Box; } diff --git a/crates/node-interaction/src/lib.rs b/crates/node-interaction/src/lib.rs index d44cebb..ece04d9 100644 --- a/crates/node-interaction/src/lib.rs +++ b/crates/node-interaction/src/lib.rs @@ -1,39 +1,42 @@ //! This crate implements all node interactions. -use alloy::primitives::{Address, StorageKey, U256}; +use std::pin::Pin; + +use alloy::primitives::{Address, StorageKey, TxHash, U256}; use alloy::rpc::types::trace::geth::{DiffMode, GethDebugTracingOptions, GethTrace}; use alloy::rpc::types::{EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest}; use anyhow::Result; use revive_dt_format::traits::ResolverApi; /// An interface for all interactions with Ethereum compatible nodes. +#[allow(clippy::type_complexity)] pub trait EthereumNode { /// Execute the [TransactionRequest] and return a [TransactionReceipt]. fn execute_transaction( &self, transaction: TransactionRequest, - ) -> impl Future>; + ) -> Pin> + '_>>; /// Trace the transaction in the [TransactionReceipt] and return a [GethTrace]. fn trace_transaction( &self, - receipt: &TransactionReceipt, + tx_hash: TxHash, trace_options: GethDebugTracingOptions, - ) -> impl Future>; + ) -> Pin> + '_>>; /// Returns the state diff of the transaction hash in the [TransactionReceipt]. - fn state_diff(&self, receipt: &TransactionReceipt) -> impl Future>; + fn state_diff(&self, tx_hash: TxHash) -> Pin> + '_>>; /// Returns the balance of the provided [`Address`] back. - fn balance_of(&self, address: Address) -> impl Future>; + fn balance_of(&self, address: Address) -> Pin> + '_>>; /// Returns the latest storage proof of the provided [`Address`] fn latest_state_proof( &self, address: Address, keys: Vec, - ) -> impl Future>; + ) -> Pin> + '_>>; /// Returns the resolver that is to use with this ethereum node. - fn resolver(&self) -> impl Future>>; + fn resolver(&self) -> Pin>> + '_>>; } diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index 5b0e423..3b73fdc 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -25,7 +25,7 @@ use alloy::{ fillers::{CachedNonceManager, ChainIdFiller, FillProvider, NonceFiller, TxFiller}, }, rpc::types::{ - EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest, + EIP1186AccountProofResponse, TransactionRequest, trace::geth::{DiffMode, GethDebugTracingOptions, PreStateConfig, PreStateFrame}, }, }; @@ -296,152 +296,172 @@ impl EthereumNode for GethNode { fields(geth_node_id = self.id, connection_string = self.connection_string), err, )] - async fn execute_transaction( + fn execute_transaction( &self, transaction: TransactionRequest, - ) -> anyhow::Result { - let provider = self - .provider() - .await - .context("Failed to create provider for transaction submission")?; + ) -> Pin> + '_>> + { + Box::pin(async move { + let provider = self + .provider() + .await + .context("Failed to create provider for transaction submission")?; - let pending_transaction = provider + let pending_transaction = provider .send_transaction(transaction) .await .inspect_err( |err| tracing::error!(%err, "Encountered an error when submitting the transaction"), ) .context("Failed to submit transaction to geth node")?; - let transaction_hash = *pending_transaction.tx_hash(); + let transaction_hash = *pending_transaction.tx_hash(); - // The following is a fix for the "transaction indexing is in progress" error that we used - // to get. You can find more information on this in the following GH issue in geth - // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, - // before we can get the receipt of the transaction it needs to have been indexed by the - // node's indexer. Just because the transaction has been confirmed it doesn't mean that it - // has been indexed. When we call alloy's `get_receipt` it checks if the transaction was - // confirmed. If it has been, then it will call `eth_getTransactionReceipt` method which - // _might_ return the above error if the tx has not yet been indexed yet. So, we need to - // implement a retry mechanism for the receipt to keep retrying to get it until it - // eventually works, but we only do that if the error we get back is the "transaction - // indexing is in progress" error or if the receipt is None. - // - // Getting the transaction indexed and taking a receipt can take a long time especially when - // a lot of transactions are being submitted to the node. Thus, while initially we only - // allowed for 60 seconds of waiting with a 1 second delay in polling, we need to allow for - // a larger wait time. Therefore, in here we allow for 5 minutes of waiting with exponential - // backoff each time we attempt to get the receipt and find that it's not available. - let provider = Arc::new(provider); - poll( - Self::RECEIPT_POLLING_DURATION, - PollingWaitBehavior::Constant(Duration::from_millis(200)), - move || { - let provider = provider.clone(); - async move { - match provider.get_transaction_receipt(transaction_hash).await { - Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), - Ok(None) => Ok(ControlFlow::Continue(())), - Err(error) => { - let error_string = error.to_string(); - match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { - true => Ok(ControlFlow::Continue(())), - false => Err(error.into()), + // The following is a fix for the "transaction indexing is in progress" error that we used + // to get. You can find more information on this in the following GH issue in geth + // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, + // before we can get the receipt of the transaction it needs to have been indexed by the + // node's indexer. Just because the transaction has been confirmed it doesn't mean that it + // has been indexed. When we call alloy's `get_receipt` it checks if the transaction was + // confirmed. If it has been, then it will call `eth_getTransactionReceipt` method which + // _might_ return the above error if the tx has not yet been indexed yet. So, we need to + // implement a retry mechanism for the receipt to keep retrying to get it until it + // eventually works, but we only do that if the error we get back is the "transaction + // indexing is in progress" error or if the receipt is None. + // + // Getting the transaction indexed and taking a receipt can take a long time especially when + // a lot of transactions are being submitted to the node. Thus, while initially we only + // allowed for 60 seconds of waiting with a 1 second delay in polling, we need to allow for + // a larger wait time. Therefore, in here we allow for 5 minutes of waiting with exponential + // backoff each time we attempt to get the receipt and find that it's not available. + let provider = Arc::new(provider); + poll( + Self::RECEIPT_POLLING_DURATION, + PollingWaitBehavior::Constant(Duration::from_millis(200)), + move || { + let provider = provider.clone(); + async move { + match provider.get_transaction_receipt(transaction_hash).await { + Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), + Ok(None) => Ok(ControlFlow::Continue(())), + Err(error) => { + let error_string = error.to_string(); + match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { + true => Ok(ControlFlow::Continue(())), + false => Err(error.into()), + } } } } - } - }, - ) - .instrument(tracing::info_span!( - "Awaiting transaction receipt", - ?transaction_hash - )) - .await + }, + ) + .instrument(tracing::info_span!( + "Awaiting transaction receipt", + ?transaction_hash + )) + .await + }) } #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - async fn trace_transaction( + fn trace_transaction( &self, - transaction: &TransactionReceipt, + tx_hash: TxHash, trace_options: GethDebugTracingOptions, - ) -> anyhow::Result { - let provider = Arc::new( + ) -> Pin> + '_>> + { + Box::pin(async move { + let provider = Arc::new( + self.provider() + .await + .context("Failed to create provider for tracing")?, + ); + poll( + Self::TRACE_POLLING_DURATION, + PollingWaitBehavior::Constant(Duration::from_millis(200)), + move || { + let provider = provider.clone(); + let trace_options = trace_options.clone(); + async move { + match provider + .debug_trace_transaction(tx_hash, trace_options) + .await + { + Ok(trace) => Ok(ControlFlow::Break(trace)), + Err(error) => { + let error_string = error.to_string(); + match error_string.contains(Self::TRANSACTION_TRACING_ERROR) { + true => Ok(ControlFlow::Continue(())), + false => Err(error.into()), + } + } + } + } + }, + ) + .await + }) + } + + #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] + fn state_diff( + &self, + tx_hash: TxHash, + ) -> Pin> + '_>> { + Box::pin(async move { + let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { + diff_mode: Some(true), + disable_code: None, + disable_storage: None, + }); + match self + .trace_transaction(tx_hash, trace_options) + .await + .context("Failed to trace transaction for prestate diff")? + .try_into_pre_state_frame() + .context("Failed to convert trace into pre-state frame")? + { + PreStateFrame::Diff(diff) => Ok(diff), + _ => anyhow::bail!("expected a diff mode trace"), + } + }) + } + + #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] + fn balance_of( + &self, + address: Address, + ) -> Pin> + '_>> { + Box::pin(async move { self.provider() .await - .context("Failed to create provider for tracing")?, - ); - poll( - Self::TRACE_POLLING_DURATION, - PollingWaitBehavior::Constant(Duration::from_millis(200)), - move || { - let provider = provider.clone(); - let trace_options = trace_options.clone(); - async move { - match provider - .debug_trace_transaction(transaction.transaction_hash, trace_options) - .await - { - Ok(trace) => Ok(ControlFlow::Break(trace)), - Err(error) => { - let error_string = error.to_string(); - match error_string.contains(Self::TRANSACTION_TRACING_ERROR) { - true => Ok(ControlFlow::Continue(())), - false => Err(error.into()), - } - } - } - } - }, - ) - .await + .context("Failed to get the Geth provider")? + .get_balance(address) + .await + .map_err(Into::into) + }) } #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { - let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { - diff_mode: Some(true), - disable_code: None, - disable_storage: None, - }); - match self - .trace_transaction(transaction, trace_options) - .await - .context("Failed to trace transaction for prestate diff")? - .try_into_pre_state_frame() - .context("Failed to convert trace into pre-state frame")? - { - PreStateFrame::Diff(diff) => Ok(diff), - _ => anyhow::bail!("expected a diff mode trace"), - } - } - - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - async fn balance_of(&self, address: Address) -> anyhow::Result { - self.provider() - .await - .context("Failed to get the Geth provider")? - .get_balance(address) - .await - .map_err(Into::into) - } - - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - async fn latest_state_proof( + fn latest_state_proof( &self, address: Address, keys: Vec, - ) -> anyhow::Result { - self.provider() - .await - .context("Failed to get the Geth provider")? - .get_proof(address, keys) - .latest() - .await - .map_err(Into::into) + ) -> Pin> + '_>> { + Box::pin(async move { + self.provider() + .await + .context("Failed to get the Geth provider")? + .get_proof(address, keys) + .latest() + .await + .map_err(Into::into) + }) } - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - fn resolver(&self) -> impl Future>> { + // #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] + fn resolver( + &self, + ) -> Pin>> + '_>> { Box::pin(async move { let id = self.id; let provider = self.provider().await?; diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index ccc7e90..c6f9d1e 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -412,77 +412,95 @@ impl KitchensinkNode { } impl EthereumNode for KitchensinkNode { - async fn execute_transaction( + fn execute_transaction( &self, transaction: alloy::rpc::types::TransactionRequest, - ) -> anyhow::Result { - let receipt = self - .provider() - .await - .context("Failed to create provider for transaction submission")? - .send_transaction(transaction) - .await - .context("Failed to submit transaction to kitchensink proxy")? - .get_receipt() - .await - .context("Failed to fetch transaction receipt from kitchensink proxy")?; - Ok(receipt) + ) -> Pin> + '_>> { + Box::pin(async move { + let receipt = self + .provider() + .await + .context("Failed to create provider for transaction submission")? + .send_transaction(transaction) + .await + .context("Failed to submit transaction to kitchensink proxy")? + .get_receipt() + .await + .context("Failed to fetch transaction receipt from kitchensink proxy")?; + Ok(receipt) + }) } - async fn trace_transaction( + fn trace_transaction( &self, - transaction: &TransactionReceipt, + tx_hash: TxHash, trace_options: GethDebugTracingOptions, - ) -> anyhow::Result { - let tx_hash = transaction.transaction_hash; - self.provider() - .await - .context("Failed to create provider for debug tracing")? - .debug_trace_transaction(tx_hash, trace_options) - .await - .context("Failed to obtain debug trace from kitchensink proxy") + ) -> Pin> + '_>> + { + Box::pin(async move { + self.provider() + .await + .context("Failed to create provider for debug tracing")? + .debug_trace_transaction(tx_hash, trace_options) + .await + .context("Failed to obtain debug trace from kitchensink proxy") + }) } - async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { - let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { - diff_mode: Some(true), - disable_code: None, - disable_storage: None, - }); - match self - .trace_transaction(transaction, trace_options) - .await? - .try_into_pre_state_frame()? - { - PreStateFrame::Diff(diff) => Ok(diff), - _ => anyhow::bail!("expected a diff mode trace"), - } + fn state_diff( + &self, + tx_hash: TxHash, + ) -> Pin> + '_>> { + Box::pin(async move { + let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { + diff_mode: Some(true), + disable_code: None, + disable_storage: None, + }); + match self + .trace_transaction(tx_hash, trace_options) + .await? + .try_into_pre_state_frame()? + { + PreStateFrame::Diff(diff) => Ok(diff), + _ => anyhow::bail!("expected a diff mode trace"), + } + }) } - async fn balance_of(&self, address: Address) -> anyhow::Result { - self.provider() - .await - .context("Failed to get the Kitchensink provider")? - .get_balance(address) - .await - .map_err(Into::into) + fn balance_of( + &self, + address: Address, + ) -> Pin> + '_>> { + Box::pin(async move { + self.provider() + .await + .context("Failed to get the Kitchensink provider")? + .get_balance(address) + .await + .map_err(Into::into) + }) } - async fn latest_state_proof( + fn latest_state_proof( &self, address: Address, keys: Vec, - ) -> anyhow::Result { - self.provider() - .await - .context("Failed to get the Kitchensink provider")? - .get_proof(address, keys) - .latest() - .await - .map_err(Into::into) + ) -> Pin> + '_>> { + Box::pin(async move { + self.provider() + .await + .context("Failed to get the Kitchensink provider")? + .get_proof(address, keys) + .latest() + .await + .map_err(Into::into) + }) } - fn resolver(&self) -> impl Future>> { + fn resolver( + &self, + ) -> Pin>> + '_>> { Box::pin(async move { let id = self.id; let provider = self.provider().await?;