Compare commits

...

3 Commits

Author SHA1 Message Date
Omar Abdulla ed1a0a1dcd Edit the formatting of the CLI case reporter 2025-08-06 15:09:22 +03:00
Omar Abdulla 746f5db66f Add a maximum to the exponential backoff wait duration 2025-08-06 14:55:48 +03:00
Omar Abdulla 1400086794 Set the gc mode to archive in geth 2025-08-06 13:53:37 +03:00
7 changed files with 152 additions and 58 deletions
Generated
+1
View File
@@ -4030,6 +4030,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"semver 1.0.26", "semver 1.0.26",
"tokio",
] ]
[[package]] [[package]]
+1
View File
@@ -11,3 +11,4 @@ rust-version.workspace = true
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
semver = { workspace = true } semver = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["time"] }
+3
View File
@@ -0,0 +1,3 @@
mod poll;
pub use poll::*;
+69
View File
@@ -0,0 +1,69 @@
use std::ops::ControlFlow;
use std::time::Duration;
use anyhow::{Result, anyhow};
const EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
/// A function that polls for a fallible future for some period of time and errors if it fails to
/// get a result after polling.
///
/// Given a future that returns a [`Result<ControlFlow<O, ()>>`], this function calls the future
/// repeatedly (with some wait period) until the future returns a [`ControlFlow::Break`] or until it
/// returns an [`Err`] in which case the function stops polling and returns the error.
///
/// If the future keeps returning [`ControlFlow::Continue`] and fails to return a [`Break`] within
/// the permitted polling duration then this function returns an [`Err`]
///
/// [`Break`]: ControlFlow::Break
/// [`Continue`]: ControlFlow::Continue
pub async fn poll<F, O>(
polling_duration: Duration,
polling_wait_behavior: PollingWaitBehavior,
mut future: impl FnMut() -> F,
) -> Result<O>
where
F: Future<Output = Result<ControlFlow<O, ()>>>,
{
let mut retries = 0;
let mut total_wait_duration = Duration::ZERO;
let max_allowed_wait_duration = polling_duration;
loop {
if total_wait_duration >= max_allowed_wait_duration {
break Err(anyhow!(
"Polling failed after {} retries and a total of {:?} of wait time",
retries,
total_wait_duration
));
}
match future().await? {
ControlFlow::Continue(()) => {
let next_wait_duration = match polling_wait_behavior {
PollingWaitBehavior::Constant(duration) => duration,
PollingWaitBehavior::ExponentialBackoff => {
Duration::from_secs(2u64.pow(retries))
.min(EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION)
}
};
let next_wait_duration =
next_wait_duration.min(max_allowed_wait_duration - total_wait_duration);
total_wait_duration += next_wait_duration;
retries += 1;
tokio::time::sleep(next_wait_duration).await;
}
ControlFlow::Break(output) => {
break Ok(output);
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum PollingWaitBehavior {
Constant(Duration),
#[default]
ExponentialBackoff,
}
+1
View File
@@ -2,6 +2,7 @@
//! the workspace can benefit from. //! the workspace can benefit from.
pub mod fs; pub mod fs;
pub mod futures;
pub mod iterators; pub mod iterators;
pub mod macros; pub mod macros;
pub mod types; pub mod types;
+3 -3
View File
@@ -248,10 +248,10 @@ where
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
case_status.sort_by(|a, b| a.0.cmp(&b.0)); case_status.sort_by(|a, b| a.0.cmp(&b.0));
for (_, case_name, case_status) in case_status.into_iter() { for (case_idx, case_name, case_status) in case_status.into_iter() {
if case_status { if case_status {
eprintln!( eprintln!(
"{GREEN} Case Succeeded:{RESET} {}", " {GREEN}Case Succeeded:{RESET} {} - Case Idx: {case_idx}",
case_name case_name
.as_ref() .as_ref()
.map(|string| string.as_str()) .map(|string| string.as_str())
@@ -259,7 +259,7 @@ where
) )
} else { } else {
eprintln!( eprintln!(
"{RED} Case Failed:{RESET} {}", " {RED}Case Failed:{RESET} {} - Case Idx: {case_idx}",
case_name case_name
.as_ref() .as_ref()
.map(|string| string.as_str()) .map(|string| string.as_str())
+74 -55
View File
@@ -3,9 +3,13 @@
use std::{ use std::{
fs::{File, OpenOptions, create_dir_all, remove_dir_all}, fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write}, io::{BufRead, BufReader, Read, Write},
ops::ControlFlow,
path::PathBuf, path::PathBuf,
process::{Child, Command, Stdio}, process::{Child, Command, Stdio},
sync::atomic::{AtomicU32, Ordering}, sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -25,11 +29,12 @@ use alloy::{
}, },
signers::local::PrivateKeySigner, signers::local::PrivateKeySigner,
}; };
use revive_dt_common::fs::clear_directory; use tracing::{Instrument, Level};
use revive_dt_common::{fs::clear_directory, futures::poll};
use revive_dt_config::Arguments; use revive_dt_config::Arguments;
use revive_dt_format::traits::ResolverApi; use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode; use revive_dt_node_interaction::EthereumNode;
use tracing::Level;
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE};
@@ -77,6 +82,10 @@ impl GethNode {
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
const RECEIPT_POLLING_DURATION: Duration = Duration::from_secs(5 * 60);
const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60);
/// Create the node directory and call `geth init` to configure the genesis. /// Create the node directory and call `geth init` to configure the genesis.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
@@ -102,6 +111,8 @@ impl GethNode {
serde_json::to_writer(File::create(&genesis_path)?, &genesis)?; serde_json::to_writer(File::create(&genesis_path)?, &genesis)?;
let mut child = Command::new(&self.geth) let mut child = Command::new(&self.geth)
.arg("--state.scheme")
.arg("hash")
.arg("init") .arg("init")
.arg("--datadir") .arg("--datadir")
.arg(&self.data_directory) .arg(&self.data_directory)
@@ -159,6 +170,12 @@ impl GethNode {
.arg("0") .arg("0")
.arg("--cache.blocklogs") .arg("--cache.blocklogs")
.arg("512") .arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(stderr_logs_file.try_clone()?) .stderr(stderr_logs_file.try_clone()?)
.stdout(stdout_logs_file.try_clone()?) .stdout(stdout_logs_file.try_clone()?)
.spawn()? .spawn()?
@@ -248,22 +265,17 @@ impl GethNode {
} }
impl EthereumNode for GethNode { impl EthereumNode for GethNode {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn execute_transaction( async fn execute_transaction(
&self, &self,
transaction: TransactionRequest, transaction: TransactionRequest,
) -> anyhow::Result<alloy::rpc::types::TransactionReceipt> { ) -> anyhow::Result<alloy::rpc::types::TransactionReceipt> {
let outer_span = tracing::debug_span!("Submitting transaction", ?transaction); let span = tracing::debug_span!("Submitting transaction", ?transaction);
let _outer_guard = outer_span.enter();
let provider = self.provider().await?;
let pending_transaction = provider.send_transaction(transaction).await?;
let transaction_hash = pending_transaction.tx_hash();
let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash);
let _guard = span.enter(); let _guard = span.enter();
let provider = Arc::new(self.provider().await?);
let transaction_hash = *provider.send_transaction(transaction).await?.tx_hash();
// The following is a fix for the "transaction indexing is in progress" error that we // The following is a fix for the "transaction indexing is in progress" error that we
// used to get. You can find more information on this in the following GH issue in geth // used to get. You can find more information on this in the following GH issue in geth
// https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on,
@@ -282,57 +294,64 @@ impl EthereumNode for GethNode {
// allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting
// with exponential backoff each time we attempt to get the receipt and find that it's // with exponential backoff each time we attempt to get the receipt and find that it's
// not available. // not available.
let mut retries = 0; poll(
let mut total_wait_duration = Duration::from_secs(0); Self::RECEIPT_POLLING_DURATION,
let max_allowed_wait_duration = Duration::from_secs(5 * 60); Default::default(),
loop { move || {
if total_wait_duration >= max_allowed_wait_duration { let provider = provider.clone();
tracing::error!( async move {
?total_wait_duration, match provider.get_transaction_receipt(transaction_hash).await {
?max_allowed_wait_duration, Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)),
retry_count = retries, Ok(None) => Ok(ControlFlow::Continue(())),
"Failed to get receipt after polling for it" Err(error) => {
); let error_string = error.to_string();
anyhow::bail!( match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) {
"Polled for receipt for {total_wait_duration:?} but failed to get it" true => Ok(ControlFlow::Continue(())),
); false => Err(error.into()),
} }
}
match provider.get_transaction_receipt(*transaction_hash).await {
Ok(Some(receipt)) => {
tracing::info!(?total_wait_duration, "Found receipt");
break Ok(receipt);
}
Ok(None) => {}
Err(error) => {
let error_string = error.to_string();
if !error_string.contains(Self::TRANSACTION_INDEXING_ERROR) {
break Err(error.into());
} }
} }
}; },
)
let next_wait_duration = Duration::from_secs(2u64.pow(retries)) .instrument(tracing::info_span!(
.min(max_allowed_wait_duration - total_wait_duration); "Awaiting transaction receipt",
total_wait_duration += next_wait_duration; ?transaction_hash
retries += 1; ))
.await
tokio::time::sleep(next_wait_duration).await;
}
} }
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn trace_transaction( async fn trace_transaction(
&self, &self,
transaction: &TransactionReceipt, transaction: &TransactionReceipt,
trace_options: GethDebugTracingOptions, trace_options: GethDebugTracingOptions,
) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> { ) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> {
let tx_hash = transaction.transaction_hash; let provider = Arc::new(self.provider().await?);
Ok(self poll(
.provider() Self::TRACE_POLLING_DURATION,
.await? Default::default(),
.debug_trace_transaction(tx_hash, trace_options) move || {
.await?) let provider = provider.clone();
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(transaction.transaction_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
} }
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]