Compare commits

..

1 Commits

Author SHA1 Message Date
Omar Abdulla aa1b5621dd Use SolidityLang for solc downloads 2025-08-06 11:46:23 +03:00
7 changed files with 58 additions and 152 deletions
Generated
-1
View File
@@ -4030,7 +4030,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"semver 1.0.26", "semver 1.0.26",
"tokio",
] ]
[[package]] [[package]]
-1
View File
@@ -11,4 +11,3 @@ rust-version.workspace = true
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
semver = { workspace = true } semver = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["time"] }
-3
View File
@@ -1,3 +0,0 @@
mod poll;
pub use poll::*;
-69
View File
@@ -1,69 +0,0 @@
use std::ops::ControlFlow;
use std::time::Duration;
use anyhow::{Result, anyhow};
const EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
/// A function that polls for a fallible future for some period of time and errors if it fails to
/// get a result after polling.
///
/// Given a future that returns a [`Result<ControlFlow<O, ()>>`], this function calls the future
/// repeatedly (with some wait period) until the future returns a [`ControlFlow::Break`] or until it
/// returns an [`Err`] in which case the function stops polling and returns the error.
///
/// If the future keeps returning [`ControlFlow::Continue`] and fails to return a [`Break`] within
/// the permitted polling duration then this function returns an [`Err`]
///
/// [`Break`]: ControlFlow::Break
/// [`Continue`]: ControlFlow::Continue
pub async fn poll<F, O>(
polling_duration: Duration,
polling_wait_behavior: PollingWaitBehavior,
mut future: impl FnMut() -> F,
) -> Result<O>
where
F: Future<Output = Result<ControlFlow<O, ()>>>,
{
let mut retries = 0;
let mut total_wait_duration = Duration::ZERO;
let max_allowed_wait_duration = polling_duration;
loop {
if total_wait_duration >= max_allowed_wait_duration {
break Err(anyhow!(
"Polling failed after {} retries and a total of {:?} of wait time",
retries,
total_wait_duration
));
}
match future().await? {
ControlFlow::Continue(()) => {
let next_wait_duration = match polling_wait_behavior {
PollingWaitBehavior::Constant(duration) => duration,
PollingWaitBehavior::ExponentialBackoff => {
Duration::from_secs(2u64.pow(retries))
.min(EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION)
}
};
let next_wait_duration =
next_wait_duration.min(max_allowed_wait_duration - total_wait_duration);
total_wait_duration += next_wait_duration;
retries += 1;
tokio::time::sleep(next_wait_duration).await;
}
ControlFlow::Break(output) => {
break Ok(output);
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum PollingWaitBehavior {
Constant(Duration),
#[default]
ExponentialBackoff,
}
-1
View File
@@ -2,7 +2,6 @@
//! the workspace can benefit from. //! the workspace can benefit from.
pub mod fs; pub mod fs;
pub mod futures;
pub mod iterators; pub mod iterators;
pub mod macros; pub mod macros;
pub mod types; pub mod types;
+3 -3
View File
@@ -248,10 +248,10 @@ where
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
case_status.sort_by(|a, b| a.0.cmp(&b.0)); case_status.sort_by(|a, b| a.0.cmp(&b.0));
for (case_idx, case_name, case_status) in case_status.into_iter() { for (_, case_name, case_status) in case_status.into_iter() {
if case_status { if case_status {
eprintln!( eprintln!(
" {GREEN}Case Succeeded:{RESET} {} - Case Idx: {case_idx}", "{GREEN} Case Succeeded:{RESET} {}",
case_name case_name
.as_ref() .as_ref()
.map(|string| string.as_str()) .map(|string| string.as_str())
@@ -259,7 +259,7 @@ where
) )
} else { } else {
eprintln!( eprintln!(
" {RED}Case Failed:{RESET} {} - Case Idx: {case_idx}", "{RED} Case Failed:{RESET} {}",
case_name case_name
.as_ref() .as_ref()
.map(|string| string.as_str()) .map(|string| string.as_str())
+55 -74
View File
@@ -3,13 +3,9 @@
use std::{ use std::{
fs::{File, OpenOptions, create_dir_all, remove_dir_all}, fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write}, io::{BufRead, BufReader, Read, Write},
ops::ControlFlow,
path::PathBuf, path::PathBuf,
process::{Child, Command, Stdio}, process::{Child, Command, Stdio},
sync::{ sync::atomic::{AtomicU32, Ordering},
Arc,
atomic::{AtomicU32, Ordering},
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -29,12 +25,11 @@ use alloy::{
}, },
signers::local::PrivateKeySigner, signers::local::PrivateKeySigner,
}; };
use tracing::{Instrument, Level}; use revive_dt_common::fs::clear_directory;
use revive_dt_common::{fs::clear_directory, futures::poll};
use revive_dt_config::Arguments; use revive_dt_config::Arguments;
use revive_dt_format::traits::ResolverApi; use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode; use revive_dt_node_interaction::EthereumNode;
use tracing::Level;
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE};
@@ -82,10 +77,6 @@ impl GethNode {
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
const RECEIPT_POLLING_DURATION: Duration = Duration::from_secs(5 * 60);
const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60);
/// Create the node directory and call `geth init` to configure the genesis. /// Create the node directory and call `geth init` to configure the genesis.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
@@ -111,8 +102,6 @@ impl GethNode {
serde_json::to_writer(File::create(&genesis_path)?, &genesis)?; serde_json::to_writer(File::create(&genesis_path)?, &genesis)?;
let mut child = Command::new(&self.geth) let mut child = Command::new(&self.geth)
.arg("--state.scheme")
.arg("hash")
.arg("init") .arg("init")
.arg("--datadir") .arg("--datadir")
.arg(&self.data_directory) .arg(&self.data_directory)
@@ -170,12 +159,6 @@ impl GethNode {
.arg("0") .arg("0")
.arg("--cache.blocklogs") .arg("--cache.blocklogs")
.arg("512") .arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(stderr_logs_file.try_clone()?) .stderr(stderr_logs_file.try_clone()?)
.stdout(stdout_logs_file.try_clone()?) .stdout(stdout_logs_file.try_clone()?)
.spawn()? .spawn()?
@@ -265,16 +248,21 @@ impl GethNode {
} }
impl EthereumNode for GethNode { impl EthereumNode for GethNode {
#[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
async fn execute_transaction( async fn execute_transaction(
&self, &self,
transaction: TransactionRequest, transaction: TransactionRequest,
) -> anyhow::Result<alloy::rpc::types::TransactionReceipt> { ) -> anyhow::Result<alloy::rpc::types::TransactionReceipt> {
let span = tracing::debug_span!("Submitting transaction", ?transaction); let outer_span = tracing::debug_span!("Submitting transaction", ?transaction);
let _guard = span.enter(); let _outer_guard = outer_span.enter();
let provider = Arc::new(self.provider().await?); let provider = self.provider().await?;
let transaction_hash = *provider.send_transaction(transaction).await?.tx_hash();
let pending_transaction = provider.send_transaction(transaction).await?;
let transaction_hash = pending_transaction.tx_hash();
let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash);
let _guard = span.enter();
// The following is a fix for the "transaction indexing is in progress" error that we // The following is a fix for the "transaction indexing is in progress" error that we
// used to get. You can find more information on this in the following GH issue in geth // used to get. You can find more information on this in the following GH issue in geth
@@ -294,64 +282,57 @@ impl EthereumNode for GethNode {
// allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting
// with exponential backoff each time we attempt to get the receipt and find that it's // with exponential backoff each time we attempt to get the receipt and find that it's
// not available. // not available.
poll( let mut retries = 0;
Self::RECEIPT_POLLING_DURATION, let mut total_wait_duration = Duration::from_secs(0);
Default::default(), let max_allowed_wait_duration = Duration::from_secs(5 * 60);
move || { loop {
let provider = provider.clone(); if total_wait_duration >= max_allowed_wait_duration {
async move { tracing::error!(
match provider.get_transaction_receipt(transaction_hash).await { ?total_wait_duration,
Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), ?max_allowed_wait_duration,
Ok(None) => Ok(ControlFlow::Continue(())), retry_count = retries,
Err(error) => { "Failed to get receipt after polling for it"
let error_string = error.to_string(); );
match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { anyhow::bail!(
true => Ok(ControlFlow::Continue(())), "Polled for receipt for {total_wait_duration:?} but failed to get it"
false => Err(error.into()), );
} }
}
match provider.get_transaction_receipt(*transaction_hash).await {
Ok(Some(receipt)) => {
tracing::info!(?total_wait_duration, "Found receipt");
break Ok(receipt);
}
Ok(None) => {}
Err(error) => {
let error_string = error.to_string();
if !error_string.contains(Self::TRANSACTION_INDEXING_ERROR) {
break Err(error.into());
} }
} }
}, };
)
.instrument(tracing::info_span!( let next_wait_duration = Duration::from_secs(2u64.pow(retries))
"Awaiting transaction receipt", .min(max_allowed_wait_duration - total_wait_duration);
?transaction_hash total_wait_duration += next_wait_duration;
)) retries += 1;
.await
tokio::time::sleep(next_wait_duration).await;
}
} }
#[tracing::instrument(level = "info", skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
async fn trace_transaction( async fn trace_transaction(
&self, &self,
transaction: &TransactionReceipt, transaction: &TransactionReceipt,
trace_options: GethDebugTracingOptions, trace_options: GethDebugTracingOptions,
) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> { ) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> {
let provider = Arc::new(self.provider().await?); let tx_hash = transaction.transaction_hash;
poll( Ok(self
Self::TRACE_POLLING_DURATION, .provider()
Default::default(), .await?
move || { .debug_trace_transaction(tx_hash, trace_options)
let provider = provider.clone(); .await?)
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(transaction.transaction_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
} }
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]