From 8619e7feb0f9833f61b5861475a513b411ea336b Mon Sep 17 00:00:00 2001 From: Omar Date: Wed, 6 Aug 2025 15:25:39 +0300 Subject: [PATCH] Fix the transaction tracing issues (#118) * Set the gc mode to archive in geth * Add a maximum to the exponential backoff wait duration * Edit the formatting of the CLI case reporter --- Cargo.lock | 1 + crates/common/Cargo.toml | 1 + crates/common/src/futures/mod.rs | 3 + crates/common/src/futures/poll.rs | 69 ++++++++++++++++ crates/common/src/lib.rs | 1 + crates/core/src/main.rs | 6 +- crates/node/src/geth.rs | 129 +++++++++++++++++------------- 7 files changed, 152 insertions(+), 58 deletions(-) create mode 100644 crates/common/src/futures/mod.rs create mode 100644 crates/common/src/futures/poll.rs diff --git a/Cargo.lock b/Cargo.lock index 230d945..02d801f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4030,6 +4030,7 @@ version = "0.1.0" dependencies = [ "anyhow", "semver 1.0.26", + "tokio", ] [[package]] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index c804254..516b1be 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -11,3 +11,4 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } semver = { workspace = true } +tokio = { workspace = true, default-features = false, features = ["time"] } diff --git a/crates/common/src/futures/mod.rs b/crates/common/src/futures/mod.rs new file mode 100644 index 0000000..17a83b2 --- /dev/null +++ b/crates/common/src/futures/mod.rs @@ -0,0 +1,3 @@ +mod poll; + +pub use poll::*; diff --git a/crates/common/src/futures/poll.rs b/crates/common/src/futures/poll.rs new file mode 100644 index 0000000..2697541 --- /dev/null +++ b/crates/common/src/futures/poll.rs @@ -0,0 +1,69 @@ +use std::ops::ControlFlow; +use std::time::Duration; + +use anyhow::{Result, anyhow}; + +const EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION: Duration = Duration::from_secs(60); + +/// A function that polls for a fallible future for some period of time and errors if it fails to +/// get a result after polling. +/// +/// Given a future that returns a [`Result>`], this function calls the future +/// repeatedly (with some wait period) until the future returns a [`ControlFlow::Break`] or until it +/// returns an [`Err`] in which case the function stops polling and returns the error. +/// +/// If the future keeps returning [`ControlFlow::Continue`] and fails to return a [`Break`] within +/// the permitted polling duration then this function returns an [`Err`] +/// +/// [`Break`]: ControlFlow::Break +/// [`Continue`]: ControlFlow::Continue +pub async fn poll( + polling_duration: Duration, + polling_wait_behavior: PollingWaitBehavior, + mut future: impl FnMut() -> F, +) -> Result +where + F: Future>>, +{ + let mut retries = 0; + let mut total_wait_duration = Duration::ZERO; + let max_allowed_wait_duration = polling_duration; + + loop { + if total_wait_duration >= max_allowed_wait_duration { + break Err(anyhow!( + "Polling failed after {} retries and a total of {:?} of wait time", + retries, + total_wait_duration + )); + } + + match future().await? { + ControlFlow::Continue(()) => { + let next_wait_duration = match polling_wait_behavior { + PollingWaitBehavior::Constant(duration) => duration, + PollingWaitBehavior::ExponentialBackoff => { + Duration::from_secs(2u64.pow(retries)) + .min(EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION) + } + }; + let next_wait_duration = + next_wait_duration.min(max_allowed_wait_duration - total_wait_duration); + total_wait_duration += next_wait_duration; + retries += 1; + + tokio::time::sleep(next_wait_duration).await; + } + ControlFlow::Break(output) => { + break Ok(output); + } + } + } +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub enum PollingWaitBehavior { + Constant(Duration), + #[default] + ExponentialBackoff, +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index e280af5..9649c36 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -2,6 +2,7 @@ //! the workspace can benefit from. pub mod fs; +pub mod futures; pub mod iterators; pub mod macros; pub mod types; diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 78ea14a..2cfd02a 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -248,10 +248,10 @@ where }) .collect::>(); case_status.sort_by(|a, b| a.0.cmp(&b.0)); - for (_, case_name, case_status) in case_status.into_iter() { + for (case_idx, case_name, case_status) in case_status.into_iter() { if case_status { eprintln!( - "{GREEN} Case Succeeded:{RESET} {}", + " {GREEN}Case Succeeded:{RESET} {} - Case Idx: {case_idx}", case_name .as_ref() .map(|string| string.as_str()) @@ -259,7 +259,7 @@ where ) } else { eprintln!( - "{RED} Case Failed:{RESET} {}", + " {RED}Case Failed:{RESET} {} - Case Idx: {case_idx}", case_name .as_ref() .map(|string| string.as_str()) diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index 2760d04..9ba595a 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -3,9 +3,13 @@ use std::{ fs::{File, OpenOptions, create_dir_all, remove_dir_all}, io::{BufRead, BufReader, Read, Write}, + ops::ControlFlow, path::PathBuf, process::{Child, Command, Stdio}, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, time::{Duration, Instant}, }; @@ -25,11 +29,12 @@ use alloy::{ }, signers::local::PrivateKeySigner, }; -use revive_dt_common::fs::clear_directory; +use tracing::{Instrument, Level}; + +use revive_dt_common::{fs::clear_directory, futures::poll}; use revive_dt_config::Arguments; use revive_dt_format::traits::ResolverApi; use revive_dt_node_interaction::EthereumNode; -use tracing::Level; use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; @@ -77,6 +82,10 @@ impl GethNode { const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; + const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet"; + + const RECEIPT_POLLING_DURATION: Duration = Duration::from_secs(5 * 60); + const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60); /// Create the node directory and call `geth init` to configure the genesis. #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] @@ -102,6 +111,8 @@ impl GethNode { serde_json::to_writer(File::create(&genesis_path)?, &genesis)?; let mut child = Command::new(&self.geth) + .arg("--state.scheme") + .arg("hash") .arg("init") .arg("--datadir") .arg(&self.data_directory) @@ -159,6 +170,12 @@ impl GethNode { .arg("0") .arg("--cache.blocklogs") .arg("512") + .arg("--state.scheme") + .arg("hash") + .arg("--syncmode") + .arg("full") + .arg("--gcmode") + .arg("archive") .stderr(stderr_logs_file.try_clone()?) .stdout(stdout_logs_file.try_clone()?) .spawn()? @@ -248,22 +265,17 @@ impl GethNode { } impl EthereumNode for GethNode { - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] + #[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))] async fn execute_transaction( &self, transaction: TransactionRequest, ) -> anyhow::Result { - let outer_span = tracing::debug_span!("Submitting transaction", ?transaction); - let _outer_guard = outer_span.enter(); - - let provider = self.provider().await?; - - let pending_transaction = provider.send_transaction(transaction).await?; - let transaction_hash = pending_transaction.tx_hash(); - - let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash); + let span = tracing::debug_span!("Submitting transaction", ?transaction); let _guard = span.enter(); + let provider = Arc::new(self.provider().await?); + let transaction_hash = *provider.send_transaction(transaction).await?.tx_hash(); + // The following is a fix for the "transaction indexing is in progress" error that we // used to get. You can find more information on this in the following GH issue in geth // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, @@ -282,57 +294,64 @@ impl EthereumNode for GethNode { // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting // with exponential backoff each time we attempt to get the receipt and find that it's // not available. - let mut retries = 0; - let mut total_wait_duration = Duration::from_secs(0); - let max_allowed_wait_duration = Duration::from_secs(5 * 60); - loop { - if total_wait_duration >= max_allowed_wait_duration { - tracing::error!( - ?total_wait_duration, - ?max_allowed_wait_duration, - retry_count = retries, - "Failed to get receipt after polling for it" - ); - anyhow::bail!( - "Polled for receipt for {total_wait_duration:?} but failed to get it" - ); - } - - match provider.get_transaction_receipt(*transaction_hash).await { - Ok(Some(receipt)) => { - tracing::info!(?total_wait_duration, "Found receipt"); - break Ok(receipt); - } - Ok(None) => {} - Err(error) => { - let error_string = error.to_string(); - if !error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { - break Err(error.into()); + poll( + Self::RECEIPT_POLLING_DURATION, + Default::default(), + move || { + let provider = provider.clone(); + async move { + match provider.get_transaction_receipt(transaction_hash).await { + Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), + Ok(None) => Ok(ControlFlow::Continue(())), + Err(error) => { + let error_string = error.to_string(); + match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { + true => Ok(ControlFlow::Continue(())), + false => Err(error.into()), + } + } } } - }; - - let next_wait_duration = Duration::from_secs(2u64.pow(retries)) - .min(max_allowed_wait_duration - total_wait_duration); - total_wait_duration += next_wait_duration; - retries += 1; - - tokio::time::sleep(next_wait_duration).await; - } + }, + ) + .instrument(tracing::info_span!( + "Awaiting transaction receipt", + ?transaction_hash + )) + .await } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] + #[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))] async fn trace_transaction( &self, transaction: &TransactionReceipt, trace_options: GethDebugTracingOptions, ) -> anyhow::Result { - let tx_hash = transaction.transaction_hash; - Ok(self - .provider() - .await? - .debug_trace_transaction(tx_hash, trace_options) - .await?) + let provider = Arc::new(self.provider().await?); + poll( + Self::TRACE_POLLING_DURATION, + Default::default(), + move || { + let provider = provider.clone(); + let trace_options = trace_options.clone(); + async move { + match provider + .debug_trace_transaction(transaction.transaction_hash, trace_options) + .await + { + Ok(trace) => Ok(ControlFlow::Break(trace)), + Err(error) => { + let error_string = error.to_string(); + match error_string.contains(Self::TRANSACTION_TRACING_ERROR) { + true => Ok(ControlFlow::Continue(())), + false => Err(error.into()), + } + } + } + } + }, + ) + .await } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]