From 390402b6cc24eb28488645af5d43a4bee616e9c7 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Fri, 1 Aug 2025 11:11:50 +0300 Subject: [PATCH] Make codebase async --- Cargo.lock | 48 ++- Cargo.toml | 5 +- .../common/src/concepts/blocking_executor.rs | 225 ------------- crates/common/src/concepts/mod.rs | 2 - crates/compiler/Cargo.toml | 1 + crates/compiler/src/lib.rs | 11 +- crates/compiler/src/revive_resolc.rs | 12 +- crates/compiler/src/solc.rs | 10 +- crates/compiler/tests/lib.rs | 14 +- crates/core/Cargo.toml | 3 +- crates/core/src/driver/mod.rs | 165 +++++----- crates/core/src/main.rs | 215 +++++++------ crates/format/Cargo.toml | 3 + crates/format/src/input.rs | 234 ++++++++------ crates/format/src/traits.rs | 17 +- crates/node-interaction/src/lib.rs | 9 +- crates/node/src/geth.rs | 296 ++++++++---------- crates/node/src/kitchensink.rs | 203 ++++++------ 18 files changed, 679 insertions(+), 794 deletions(-) delete mode 100644 crates/common/src/concepts/blocking_executor.rs diff --git a/Cargo.lock b/Cargo.lock index 160dc91..e7daebb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2713,7 +2713,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -2953,6 +2953,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4043,6 +4054,7 @@ dependencies = [ "semver 1.0.26", "serde", "serde_json", + "tokio", "tracing", ] @@ -4064,8 +4076,8 @@ dependencies = [ "alloy", "anyhow", "clap", + "futures", "indexmap 2.10.0", - "rayon", "revive-dt-common", "revive-dt-compiler", "revive-dt-config", @@ -4075,6 +4087,7 @@ dependencies = [ "revive-dt-report", "semver 1.0.26", "temp-dir", + "tokio", "tracing", "tracing-subscriber", ] @@ -4091,6 +4104,7 @@ dependencies = [ "semver 1.0.26", "serde", "serde_json", + "tokio", "tracing", ] @@ -4701,6 +4715,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4745,6 +4768,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "sp-application-crypto" version = "40.1.0" @@ -5404,18 +5437,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "pin-project-lite", - "socket2", + "signal-hook-registry", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9855d12..2587665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ futures = { version = "0.3.31" } hex = "0.4.3" reqwest = { version = "0.12.15", features = ["blocking", "json"] } once_cell = "1.21" -rayon = { version = "1.10" } semver = { version = "1.0", features = ["serde"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = [ @@ -43,8 +42,10 @@ sp-core = "36.1.0" sp-runtime = "41.1.0" temp-dir = { version = "0.1.16" } tempfile = "3.3" -tokio = { version = "1", default-features = false, features = [ +tokio = { version = "1.47.0", default-features = false, features = [ "rt-multi-thread", + "process", + "rt", ] } uuid = { version = "1.8", features = ["v4"] } tracing = "0.1.41" diff --git a/crates/common/src/concepts/blocking_executor.rs b/crates/common/src/concepts/blocking_executor.rs deleted file mode 100644 index b07b7fd..0000000 --- a/crates/common/src/concepts/blocking_executor.rs +++ /dev/null @@ -1,225 +0,0 @@ -//! The alloy crate __requires__ a tokio runtime. -//! We contain any async rust right here. - -use std::{any::Any, panic::AssertUnwindSafe, pin::Pin, thread}; - -use futures::FutureExt; -use once_cell::sync::Lazy; -use tokio::{ - runtime::Builder, - sync::{mpsc::UnboundedSender, oneshot}, -}; -use tracing::Instrument; - -/// A blocking async executor. -/// -/// This struct exposes the abstraction of a blocking async executor. It is a global and static -/// executor which means that it doesn't require for new instances of it to be created, it's a -/// singleton and can be accessed by any thread that wants to perform some async computation on the -/// blocking executor thread. -/// -/// The API of the blocking executor is created in a way so that it's very natural, simple to use, -/// and unbounded to specific tasks or return types. The following is an example of using this -/// executor to drive an async computation: -/// -/// ```rust -/// use revive_dt_common::concepts::*; -/// -/// fn blocking_function() { -/// let result = BlockingExecutor::execute(async move { -/// tokio::time::sleep(std::time::Duration::from_secs(1)).await; -/// 0xFFu8 -/// }) -/// .expect("Computation failed"); -/// -/// assert_eq!(result, 0xFF); -/// } -/// ``` -/// -/// Users get to pass in their async tasks without needing to worry about putting them in a [`Box`], -/// [`Pin`], needing to perform down-casting, or the internal channel mechanism used by the runtime. -/// To the user, it just looks like a function that converts some async code into sync code. -/// -/// This struct also handled panics that occur in the passed futures and converts them into errors -/// that can be handled by the user. This is done to allow the executor to be robust. -/// -/// Internally, the executor communicates with the tokio runtime thread through channels which carry -/// the [`TaskMessage`] and the results of the execution. -pub struct BlockingExecutor; - -impl BlockingExecutor { - pub fn execute(future: impl Future + Send + 'static) -> Result - where - R: Send + 'static, - { - // Note: The blocking executor is a singleton and therefore we store its state in a static - // so that it's assigned only once. Additionally, when we set the state of the executor we - // spawn the thread where the async runtime runs. - static STATE: Lazy = Lazy::new(|| { - tracing::trace!("Initializing the BlockingExecutor state"); - - // All communication with the tokio runtime thread happens over mspc channels where the - // producers here are the threads that want to run async tasks and the consumer here is - // the tokio runtime thread. - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); - - thread::spawn(move || { - tracing::info!( - thread_id = ?std::thread::current().id(), - "Starting async runtime thread" - ); - - let runtime = Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create the async runtime"); - - runtime.block_on(async move { - while let Some(TaskMessage { - future: task, - response_tx: response_channel, - }) = rx.recv().await - { - tracing::trace!("Received a new future to execute"); - tokio::spawn(async move { - // One of the things that the blocking executor does is that it allows - // us to catch panics if they occur. By wrapping the given future in an - // AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds - // in the given future and convert them into errors. - let task = AssertUnwindSafe(task).catch_unwind(); - - let result = task.await; - let _ = response_channel.send(result); - }); - } - }) - }); - - ExecutorState { tx } - }); - - // We need to perform blocking synchronous communication between the current thread and the - // tokio runtime thread with the result of the async computation and the oneshot channels - // from tokio allows us to do that. The sender side of the channel will be given to the - // tokio runtime thread to send the result when the computation is completed and the receive - // side of the channel will be kept with this thread to await for the response of the async - // task to come back. - let (response_tx, response_rx) = - oneshot::channel::, Box>>(); - - // The tokio runtime thread expects a Future> + Send to be - // sent to it to execute. However, this function has a typed Future + Send and - // therefore we need to change the type of the future to fit what the runtime thread expects - // in the task message. In doing this conversion, we lose some of the type information since - // we're converting R => dyn Any. However, we will perform down-casting on the result to - // convert it back into R. - let future = Box::pin( - async move { Box::new(future.await) as Box }.in_current_span(), - ); - - let task = TaskMessage::new(future, response_tx); - if let Err(error) = STATE.tx.send(task) { - tracing::error!(?error, "Failed to send the task to the blocking executor"); - anyhow::bail!("Failed to send the task to the blocking executor: {error:?}") - } - - let result = match response_rx.blocking_recv() { - Ok(result) => result, - Err(error) => { - tracing::error!( - ?error, - "Failed to get the response from the blocking executor" - ); - anyhow::bail!("Failed to get the response from the blocking executor: {error:?}") - } - }; - - let result = match result { - Ok(result) => result, - Err(error) => { - tracing::error!(?error, "An error occurred when running the async task"); - anyhow::bail!("An error occurred when running the async task: {error:?}") - } - }; - - Ok(*result - .downcast::() - .expect("An error occurred when downcasting into R. This is a bug")) - } -} -/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime -/// which means that in the current running program there's just a single thread that has an async -/// runtime. -struct ExecutorState { - /// The sending side of the task messages channel. This is used by all of the other threads to - /// communicate with the async runtime thread. - tx: UnboundedSender, -} - -/// Represents a message that contains an asynchronous task that's to be executed by the runtime -/// as well as a way for the runtime to report back on the result of the execution. -struct TaskMessage { - /// The task that's being requested to run. This is a future that returns an object that does - /// implement [`Any`] and [`Send`] to allow it to be sent between the requesting thread and the - /// async thread. - future: Pin> + Send>>, - - /// A one shot sender channel where the sender of the task is expecting to hear back on the - /// result of the task. - response_tx: oneshot::Sender, Box>>, -} - -impl TaskMessage { - pub fn new( - future: Pin> + Send>>, - response_tx: oneshot::Sender, Box>>, - ) -> Self { - Self { - future, - response_tx, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn simple_future_works() { - // Act - let result = BlockingExecutor::execute(async move { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - 0xFFu8 - }) - .unwrap(); - - // Assert - assert_eq!(result, 0xFFu8); - } - - #[test] - #[allow(unreachable_code, clippy::unreachable)] - fn panics_in_futures_are_caught() { - // Act - let result = BlockingExecutor::execute(async move { - panic!( - "If this panic causes, well, a panic, then this is an issue. If it's caught then all good!" - ); - 0xFFu8 - }); - - // Assert - assert!(result.is_err()); - - // Act - let result = BlockingExecutor::execute(async move { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - 0xFFu8 - }) - .unwrap(); - - // Assert - assert_eq!(result, 0xFFu8) - } -} diff --git a/crates/common/src/concepts/mod.rs b/crates/common/src/concepts/mod.rs index cf633e9..8b13789 100644 --- a/crates/common/src/concepts/mod.rs +++ b/crates/common/src/concepts/mod.rs @@ -1,3 +1 @@ -mod blocking_executor; -pub use blocking_executor::*; diff --git a/crates/compiler/Cargo.toml b/crates/compiler/Cargo.toml index 295a147..9e10a10 100644 --- a/crates/compiler/Cargo.toml +++ b/crates/compiler/Cargo.toml @@ -23,3 +23,4 @@ semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true } diff --git a/crates/compiler/src/lib.rs b/crates/compiler/src/lib.rs index 364359c..a9c5df5 100644 --- a/crates/compiler/src/lib.rs +++ b/crates/compiler/src/lib.rs @@ -33,7 +33,7 @@ pub trait SolidityCompiler { &self, input: CompilerInput, additional_options: Self::Options, - ) -> anyhow::Result; + ) -> impl Future>; fn new(solc_executable: PathBuf) -> Self; @@ -147,8 +147,13 @@ where self } - pub fn try_build(self, compiler_path: impl AsRef) -> anyhow::Result { - T::new(compiler_path.as_ref().to_path_buf()).build(self.input, self.additional_options) + pub async fn try_build( + self, + compiler_path: impl AsRef, + ) -> anyhow::Result { + T::new(compiler_path.as_ref().to_path_buf()) + .build(self.input, self.additional_options) + .await } pub fn input(&self) -> CompilerInput { diff --git a/crates/compiler/src/revive_resolc.rs b/crates/compiler/src/revive_resolc.rs index 507539e..b1f3bca 100644 --- a/crates/compiler/src/revive_resolc.rs +++ b/crates/compiler/src/revive_resolc.rs @@ -6,7 +6,6 @@ use std::{ process::{Command, Stdio}, }; -use alloy::json_abi::JsonAbi; use revive_dt_common::types::VersionOrRequirement; use revive_dt_config::Arguments; use revive_solc_json_interface::{ @@ -17,8 +16,10 @@ use revive_solc_json_interface::{ use crate::{CompilerInput, CompilerOutput, SolidityCompiler}; +use alloy::json_abi::JsonAbi; use anyhow::Context; use semver::Version; +use tokio::{io::AsyncWriteExt, process::Command as AsyncCommand}; // TODO: I believe that we need to also pass the solc compiler to resolc so that resolc uses the // specified solc compiler. I believe that currently we completely ignore the specified solc binary @@ -35,7 +36,7 @@ impl SolidityCompiler for Resolc { type Options = Vec; #[tracing::instrument(level = "debug", ret)] - fn build( + async fn build( &self, CompilerInput { enable_optimization, @@ -87,7 +88,7 @@ impl SolidityCompiler for Resolc { }, }; - let mut command = Command::new(&self.resolc_path); + let mut command = AsyncCommand::new(&self.resolc_path); command .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -109,9 +110,10 @@ impl SolidityCompiler for Resolc { let mut child = command.spawn()?; let stdin_pipe = child.stdin.as_mut().expect("stdin must be piped"); - serde_json::to_writer(stdin_pipe, &input)?; + let serialized_input = serde_json::to_vec(&input)?; + stdin_pipe.write_all(&serialized_input).await?; - let output = child.wait_with_output()?; + let output = child.wait_with_output().await?; let stdout = output.stdout; let stderr = output.stderr; diff --git a/crates/compiler/src/solc.rs b/crates/compiler/src/solc.rs index 20f5e6d..cb12e55 100644 --- a/crates/compiler/src/solc.rs +++ b/crates/compiler/src/solc.rs @@ -21,6 +21,7 @@ use foundry_compilers_artifacts::{ solc::*, }; use semver::Version; +use tokio::{io::AsyncWriteExt, process::Command as AsyncCommand}; #[derive(Debug)] pub struct Solc { @@ -31,7 +32,7 @@ impl SolidityCompiler for Solc { type Options = (); #[tracing::instrument(level = "debug", ret)] - fn build( + async fn build( &self, CompilerInput { enable_optimization, @@ -90,7 +91,7 @@ impl SolidityCompiler for Solc { }, }; - let mut command = Command::new(&self.solc_path); + let mut command = AsyncCommand::new(&self.solc_path); command .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -112,8 +113,9 @@ impl SolidityCompiler for Solc { let mut child = command.spawn()?; let stdin = child.stdin.as_mut().expect("should be piped"); - serde_json::to_writer(stdin, &input)?; - let output = child.wait_with_output()?; + let serialized_input = serde_json::to_vec(&input)?; + stdin.write_all(&serialized_input).await?; + let output = child.wait_with_output().await?; if !output.status.success() { let json_in = serde_json::to_string_pretty(&input)?; diff --git a/crates/compiler/tests/lib.rs b/crates/compiler/tests/lib.rs index 63e07e7..6815b62 100644 --- a/crates/compiler/tests/lib.rs +++ b/crates/compiler/tests/lib.rs @@ -4,8 +4,8 @@ use revive_dt_compiler::{Compiler, SolidityCompiler, revive_resolc::Resolc, solc use revive_dt_config::Arguments; use semver::Version; -#[test] -fn contracts_can_be_compiled_with_solc() { +#[tokio::test] +async fn contracts_can_be_compiled_with_solc() { // Arrange let args = Arguments::default(); let compiler_path = Solc::get_compiler_executable(&args, Version::new(0, 8, 30)).unwrap(); @@ -16,7 +16,8 @@ fn contracts_can_be_compiled_with_solc() { .unwrap() .with_source("./tests/assets/array_one_element/main.sol") .unwrap() - .try_build(compiler_path); + .try_build(compiler_path) + .await; // Assert let output = output.expect("Failed to compile"); @@ -42,8 +43,8 @@ fn contracts_can_be_compiled_with_solc() { assert!(callable_file_contracts.contains_key("Callable")); } -#[test] -fn contracts_can_be_compiled_with_resolc() { +#[tokio::test] +async fn contracts_can_be_compiled_with_resolc() { // Arrange let args = Arguments::default(); let compiler_path = Resolc::get_compiler_executable(&args, Version::new(0, 8, 30)).unwrap(); @@ -54,7 +55,8 @@ fn contracts_can_be_compiled_with_resolc() { .unwrap() .with_source("./tests/assets/array_one_element/main.sol") .unwrap() - .try_build(compiler_path); + .try_build(compiler_path) + .await; // Assert let output = output.expect("Failed to compile"); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a1da05d..3966618 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -24,9 +24,10 @@ revive-dt-report = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } clap = { workspace = true } +futures = { workspace = true } indexmap = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -rayon = { workspace = true } semver = { workspace = true } temp-dir = { workspace = true } diff --git a/crates/core/src/driver/mod.rs b/crates/core/src/driver/mod.rs index eb84ae3..fc06ca1 100644 --- a/crates/core/src/driver/mod.rs +++ b/crates/core/src/driver/mod.rs @@ -67,24 +67,31 @@ where } } - pub fn handle_input( + pub async fn handle_input( &mut self, metadata: &Metadata, case_idx: CaseIdx, input: &Input, node: &T::Blockchain, ) -> anyhow::Result<(TransactionReceipt, GethTrace, DiffMode)> { - let deployment_receipts = - self.handle_contract_deployment(metadata, case_idx, input, node)?; - let execution_receipt = self.handle_input_execution(input, deployment_receipts, node)?; - let tracing_result = self.handle_input_call_frame_tracing(&execution_receipt, node)?; + let deployment_receipts = self + .handle_contract_deployment(metadata, case_idx, input, node) + .await?; + let execution_receipt = self + .handle_input_execution(input, deployment_receipts, node) + .await?; + let tracing_result = self + .handle_input_call_frame_tracing(&execution_receipt, node) + .await?; self.handle_input_variable_assignment(input, &tracing_result)?; - self.handle_input_expectations(input, &execution_receipt, node, &tracing_result)?; + self.handle_input_expectations(input, &execution_receipt, node, &tracing_result) + .await?; self.handle_input_diff(case_idx, execution_receipt, node) + .await } /// Handles the contract deployment for a given input performing it if it needs to be performed. - fn handle_contract_deployment( + async fn handle_contract_deployment( &mut self, metadata: &Metadata, case_idx: CaseIdx, @@ -121,14 +128,17 @@ where .then_some(input.value) .flatten(); - if let (_, _, Some(receipt)) = self.get_or_deploy_contract_instance( - &instance, - metadata, - input.caller, - calldata, - value, - node, - )? { + if let (_, _, Some(receipt)) = self + .get_or_deploy_contract_instance( + &instance, + metadata, + input.caller, + calldata, + value, + node, + ) + .await? + { receipts.insert(instance.clone(), receipt); } } @@ -137,7 +147,7 @@ where } /// Handles the execution of the input in terms of the calls that need to be made. - fn handle_input_execution( + async fn handle_input_execution( &mut self, input: &Input, mut deployment_receipts: HashMap, @@ -150,22 +160,23 @@ where .remove(&input.instance) .context("Failed to find deployment receipt"), Method::Fallback | Method::FunctionName(_) => { - let tx = - match input.legacy_transaction(&self.deployed_contracts, &self.variables, node) - { - Ok(tx) => { - tracing::debug!("Legacy transaction data: {tx:#?}"); - tx - } - Err(err) => { - tracing::error!("Failed to construct legacy transaction: {err:?}"); - return Err(err); - } - }; + let tx = match input + .legacy_transaction(&self.deployed_contracts, &self.variables, node) + .await + { + Ok(tx) => { + tracing::debug!("Legacy transaction data: {tx:#?}"); + tx + } + Err(err) => { + tracing::error!("Failed to construct legacy transaction: {err:?}"); + return Err(err); + } + }; tracing::trace!("Executing transaction for input: {input:?}"); - match node.execute_transaction(tx) { + match node.execute_transaction(tx).await { Ok(receipt) => Ok(receipt), Err(err) => { tracing::error!( @@ -180,7 +191,7 @@ where } } - fn handle_input_call_frame_tracing( + async fn handle_input_call_frame_tracing( &self, execution_receipt: &TransactionReceipt, node: &T::Blockchain, @@ -194,6 +205,7 @@ where ..Default::default() }, ) + .await .map(|trace| { trace .try_into_call_frame() @@ -226,7 +238,7 @@ where Ok(()) } - fn handle_input_expectations( + async fn handle_input_expectations( &mut self, input: &Input, execution_receipt: &TransactionReceipt, @@ -270,13 +282,14 @@ where node, expectation, tracing_result, - )?; + ) + .await?; } Ok(()) } - fn handle_input_expectation_item( + async fn handle_input_expectation_item( &mut self, execution_receipt: &TransactionReceipt, node: &T::Blockchain, @@ -313,12 +326,15 @@ where if let Some(ref expected_calldata) = expectation.return_data { let expected = expected_calldata; let actual = &tracing_result.output.as_ref().unwrap_or_default(); - if !expected.is_equivalent( - actual, - deployed_contracts, - &*variables, - chain_state_provider, - )? { + if !expected + .is_equivalent( + actual, + deployed_contracts, + &*variables, + chain_state_provider, + ) + .await? + { tracing::error!( ?execution_receipt, ?expected, @@ -349,7 +365,8 @@ where if let Some(ref expected_address) = expected_event.address { let expected = Address::from_slice( Calldata::new_compound([expected_address]) - .calldata(deployed_contracts, &*variables, node)? + .calldata(deployed_contracts, &*variables, node) + .await? .get(12..32) .expect("Can't fail"), ); @@ -374,12 +391,15 @@ where .zip(actual_event.topics()) { let expected = Calldata::new_compound([expected]); - if !expected.is_equivalent( - &actual.0, - deployed_contracts, - &*variables, - chain_state_provider, - )? { + if !expected + .is_equivalent( + &actual.0, + deployed_contracts, + &*variables, + chain_state_provider, + ) + .await? + { tracing::error!( ?execution_receipt, ?expected, @@ -395,12 +415,15 @@ where // Handling the values assertion. let expected = &expected_event.values; let actual = &actual_event.data().data; - if !expected.is_equivalent( - &actual.0, - deployed_contracts, - &*variables, - chain_state_provider, - )? { + if !expected + .is_equivalent( + &actual.0, + deployed_contracts, + &*variables, + chain_state_provider, + ) + .await? + { tracing::error!( ?execution_receipt, ?expected, @@ -417,7 +440,7 @@ where Ok(()) } - fn handle_input_diff( + async fn handle_input_diff( &mut self, _: CaseIdx, execution_receipt: TransactionReceipt, @@ -432,8 +455,10 @@ where disable_storage: None, }); - let trace = node.trace_transaction(&execution_receipt, trace_options)?; - let diff = node.state_diff(&execution_receipt)?; + let trace = node + .trace_transaction(&execution_receipt, trace_options) + .await?; + let diff = node.state_diff(&execution_receipt).await?; Ok((execution_receipt, trace, diff)) } @@ -444,7 +469,7 @@ where /// If a [`CaseIdx`] is not specified then this contact instance address will be stored in the /// cross-case deployed contracts address mapping. #[allow(clippy::too_many_arguments)] - pub fn get_or_deploy_contract_instance( + pub async fn get_or_deploy_contract_instance( &mut self, contract_instance: &ContractInstance, metadata: &Metadata, @@ -500,7 +525,9 @@ where }; if let Some(calldata) = calldata { - let calldata = calldata.calldata(&self.deployed_contracts, None, node)?; + let calldata = calldata + .calldata(&self.deployed_contracts, None, node) + .await?; code.extend(calldata); } @@ -513,7 +540,7 @@ where TransactionBuilder::::with_deploy_code(tx, code) }; - let receipt = match node.execute_transaction(tx) { + let receipt = match node.execute_transaction(tx).await { Ok(receipt) => receipt, Err(error) => { tracing::error!( @@ -604,7 +631,7 @@ where } } - pub fn execute(&mut self) -> anyhow::Result { + pub async fn execute(&mut self) -> anyhow::Result { if !self .leader_node .matches_target(self.metadata.targets.as_deref()) @@ -624,18 +651,14 @@ where let tracing_span = tracing::info_span!("Handling input", input_idx); let _guard = tracing_span.enter(); - let (leader_receipt, _, leader_diff) = self.leader_state.handle_input( - self.metadata, - self.case_idx, - &input, - self.leader_node, - )?; - let (follower_receipt, _, follower_diff) = self.follower_state.handle_input( - self.metadata, - self.case_idx, - &input, - self.follower_node, - )?; + let (leader_receipt, _, leader_diff) = self + .leader_state + .handle_input(self.metadata, self.case_idx, &input, self.leader_node) + .await?; + let (follower_receipt, _, follower_diff) = self + .follower_state + .handle_input(self.metadata, self.case_idx, &input, self.follower_node) + .await?; if leader_diff == follower_diff { tracing::debug!("State diffs match between leader and follower."); diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 59bea7b..8f14861 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, path::Path, - sync::{Arc, LazyLock, Mutex, RwLock}, + sync::{Arc, LazyLock}, }; use alloy::{ @@ -12,12 +12,13 @@ use alloy::{ }; use anyhow::Context; use clap::Parser; -use rayon::{ThreadPoolBuilder, prelude::*}; +use futures::StreamExt; use revive_dt_common::iterators::FilesWithExtensionIterator; use revive_dt_node_interaction::EthereumNode; use semver::Version; use temp_dir::TempDir; -use tracing::Level; +use tokio::sync::{Mutex, RwLock}; +use tracing::{Instrument, Level}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; use revive_dt_compiler::SolidityCompiler; @@ -51,18 +52,24 @@ type CompilationCache<'a> = Arc< fn main() -> anyhow::Result<()> { let args = init_cli()?; - for (corpus, tests) in collect_corpora(&args)? { - let span = Span::new(corpus, args.clone())?; - - match &args.compile_only { - Some(platform) => compile_corpus(&args, &tests, platform, span), - None => execute_corpus(&args, &tests, span)?, + let body = async { + for (corpus, tests) in collect_corpora(&args)? { + let span = Span::new(corpus, args.clone())?; + match &args.compile_only { + Some(platform) => compile_corpus(&args, &tests, platform, span).await, + None => execute_corpus(&args, &tests, span).await?, + } + Report::save()?; } + Ok(()) + }; - Report::save()?; - } - - Ok(()) + tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.number_of_threads) + .enable_all() + .build() + .expect("Failed building the Runtime") + .block_on(body) } fn init_cli() -> anyhow::Result { @@ -93,10 +100,6 @@ fn init_cli() -> anyhow::Result { } tracing::info!("workdir: {}", args.directory().display()); - ThreadPoolBuilder::new() - .num_threads(args.number_of_threads) - .build_global()?; - Ok(args) } @@ -114,7 +117,11 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()> +async fn run_driver( + args: &Arguments, + tests: &[MetadataFile], + span: Span, +) -> anyhow::Result<()> where L: Platform, F: Platform, @@ -146,42 +153,52 @@ where .collect::>(); let compilation_cache = Arc::new(RwLock::new(HashMap::new())); - test_cases.into_par_iter().for_each( - |(metadata_file_path, metadata, case_idx, case, solc_mode)| { - let tracing_span = tracing::span!( - Level::INFO, - "Running driver", - metadata_file_path = %metadata_file_path.display(), - case_idx = case_idx, - solc_mode = ?solc_mode, - ); - let _guard = tracing_span.enter(); - - let result = handle_case_driver::( - metadata_file_path.as_path(), - metadata, - case_idx.into(), - case, - solc_mode, - args, - compilation_cache.clone(), - leader_nodes.round_robbin(), - follower_nodes.round_robbin(), - span, - ); - match result { - Ok(inputs_executed) => tracing::info!(inputs_executed, "Execution succeeded"), - Err(error) => tracing::info!(%error, "Execution failed"), - } - tracing::info!("Execution completed"); - }, - ); + futures::stream::iter(test_cases) + .for_each_concurrent( + None, + |(metadata_file_path, metadata, case_idx, case, solc_mode)| { + let compilation_cache = compilation_cache.clone(); + let leader_node = leader_nodes.round_robbin(); + let follower_node = follower_nodes.round_robbin(); + let tracing_span = tracing::span!( + Level::INFO, + "Running driver", + metadata_file_path = %metadata_file_path.display(), + case_idx = case_idx, + solc_mode = ?solc_mode, + ); + async move { + let result = handle_case_driver::( + metadata_file_path.as_path(), + metadata, + case_idx.into(), + case, + solc_mode, + args, + compilation_cache.clone(), + leader_node, + follower_node, + span, + ) + .await; + match result { + Ok(inputs_executed) => { + tracing::info!(inputs_executed, "Execution succeeded") + } + Err(error) => tracing::info!(%error, "Execution failed"), + } + tracing::info!("Execution completed"); + } + .instrument(tracing_span) + }, + ) + .await; Ok(()) } #[allow(clippy::too_many_arguments)] -fn handle_case_driver<'a, L, F>( +async fn handle_case_driver<'a, L, F>( metadata_file_path: &'a Path, metadata: &'a Metadata, case_idx: CaseIdx, @@ -206,7 +223,8 @@ where config, compilation_cache.clone(), &HashMap::new(), - )?; + ) + .await?; let follower_pre_link_contracts = get_or_build_contracts::( metadata, metadata_file_path, @@ -214,7 +232,8 @@ where config, compilation_cache.clone(), &HashMap::new(), - )?; + ) + .await?; let mut leader_deployed_libraries = HashMap::new(); let mut follower_deployed_libraries = HashMap::new(); @@ -288,7 +307,7 @@ where follower_code, ); - let leader_receipt = match leader_node.execute_transaction(leader_tx) { + let leader_receipt = match leader_node.execute_transaction(leader_tx).await { Ok(receipt) => receipt, Err(error) => { tracing::error!( @@ -299,7 +318,7 @@ where return Err(error); } }; - let follower_receipt = match follower_node.execute_transaction(follower_tx) { + let follower_receipt = match follower_node.execute_transaction(follower_tx).await { Ok(receipt) => receipt, Err(error) => { tracing::error!( @@ -349,7 +368,7 @@ where let leader_key = (metadata_file_path, mode.clone(), L::config_id()); let follower_key = (metadata_file_path, mode.clone(), L::config_id()); { - let mut cache = compilation_cache.write().expect("Poisoned"); + let mut cache = compilation_cache.write().await; cache.remove(&leader_key); cache.remove(&follower_key); } @@ -361,7 +380,8 @@ where config, compilation_cache.clone(), &leader_deployed_libraries, - )?; + ) + .await?; let follower_post_link_contracts = get_or_build_contracts::( metadata, metadata_file_path, @@ -369,7 +389,8 @@ where config, compilation_cache, &follower_deployed_libraries, - )?; + ) + .await?; (leader_post_link_contracts, follower_post_link_contracts) } else { @@ -396,10 +417,10 @@ where leader_state, follower_state, ); - driver.execute() + driver.execute().await } -fn get_or_build_contracts<'a, P: Platform>( +async fn get_or_build_contracts<'a, P: Platform>( metadata: &'a Metadata, metadata_file_path: &'a Path, mode: SolcMode, @@ -408,13 +429,8 @@ fn get_or_build_contracts<'a, P: Platform>( deployed_libraries: &HashMap, ) -> anyhow::Result> { let key = (metadata_file_path, mode.clone(), P::config_id()); - if let Some(compilation_artifact) = compilation_cache - .read() - .expect("Poisoned") - .get(&key) - .cloned() - { - let mut compilation_artifact = compilation_artifact.lock().expect("Poisoned"); + if let Some(compilation_artifact) = compilation_cache.read().await.get(&key).cloned() { + let mut compilation_artifact = compilation_artifact.lock().await; match *compilation_artifact { Some(ref compiled_contracts) => { tracing::debug!(?key, "Compiled contracts cache hit"); @@ -422,12 +438,9 @@ fn get_or_build_contracts<'a, P: Platform>( } None => { tracing::debug!(?key, "Compiled contracts cache miss"); - let compiled_contracts = Arc::new(compile_contracts::

( - metadata, - &mode, - config, - deployed_libraries, - )?); + let compiled_contracts = Arc::new( + compile_contracts::

(metadata, &mode, config, deployed_libraries).await?, + ); *compilation_artifact = Some(compiled_contracts.clone()); return Ok(compiled_contracts.clone()); } @@ -436,23 +449,19 @@ fn get_or_build_contracts<'a, P: Platform>( tracing::debug!(?key, "Compiled contracts cache miss"); let mutex = { - let mut compilation_cache = compilation_cache.write().expect("Poisoned"); + let mut compilation_cache = compilation_cache.write().await; let mutex = Arc::new(Mutex::new(None)); compilation_cache.insert(key, mutex.clone()); mutex }; - let mut compilation_artifact = mutex.lock().expect("Poisoned"); - let compiled_contracts = Arc::new(compile_contracts::

( - metadata, - &mode, - config, - deployed_libraries, - )?); + let mut compilation_artifact = mutex.lock().await; + let compiled_contracts = + Arc::new(compile_contracts::

(metadata, &mode, config, deployed_libraries).await?); *compilation_artifact = Some(compiled_contracts.clone()); Ok(compiled_contracts.clone()) } -fn compile_contracts( +async fn compile_contracts( metadata: &Metadata, mode: &SolcMode, config: &Arguments, @@ -489,18 +498,22 @@ fn compile_contracts( }); } - let compiler_output = compiler.try_build(compiler_path)?; + let compiler_output = compiler.try_build(compiler_path).await?; Ok((compiler_version, compiler_output)) } -fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()> { +async fn execute_corpus( + args: &Arguments, + tests: &[MetadataFile], + span: Span, +) -> anyhow::Result<()> { match (&args.leader, &args.follower) { (TestingPlatform::Geth, TestingPlatform::Kitchensink) => { - run_driver::(args, tests, span)? + run_driver::(args, tests, span).await? } (TestingPlatform::Geth, TestingPlatform::Geth) => { - run_driver::(args, tests, span)? + run_driver::(args, tests, span).await? } _ => unimplemented!(), } @@ -508,27 +521,41 @@ fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyho Ok(()) } -fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &TestingPlatform, _: Span) { - tests.par_iter().for_each(|metadata| { - for mode in &metadata.solc_modes() { +async fn compile_corpus( + config: &Arguments, + tests: &[MetadataFile], + platform: &TestingPlatform, + _: Span, +) { + let tests = tests.iter().flat_map(|metadata| { + metadata + .solc_modes() + .into_iter() + .map(move |solc_mode| (metadata, solc_mode)) + }); + + futures::stream::iter(tests) + .for_each_concurrent(None, |(metadata, mode)| async move { match platform { TestingPlatform::Geth => { let _ = compile_contracts::( &metadata.content, - mode, + &mode, config, &Default::default(), - ); + ) + .await; } TestingPlatform::Kitchensink => { let _ = compile_contracts::( &metadata.content, - mode, + &mode, config, &Default::default(), - ); + ) + .await; } - }; - } - }); + } + }) + .await; } diff --git a/crates/format/Cargo.toml b/crates/format/Cargo.toml index f5150ac..48d754b 100644 --- a/crates/format/Cargo.toml +++ b/crates/format/Cargo.toml @@ -19,3 +19,6 @@ tracing = { workspace = true } semver = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/format/src/input.rs b/crates/format/src/input.rs index 8031553..7eebf7f 100644 --- a/crates/format/src/input.rs +++ b/crates/format/src/input.rs @@ -198,7 +198,7 @@ impl Input { .ok_or_else(|| anyhow::anyhow!("instance {instance:?} not deployed")) } - pub fn encoded_input<'a>( + pub async fn encoded_input<'a>( &'a self, deployed_contracts: &HashMap, variables: impl Into>> + Clone, @@ -206,9 +206,10 @@ impl Input { ) -> anyhow::Result { match self.method { Method::Deployer | Method::Fallback => { - let calldata = - self.calldata - .calldata(deployed_contracts, variables, chain_state_provider)?; + let calldata = self + .calldata + .calldata(deployed_contracts, variables, chain_state_provider) + .await?; Ok(calldata.into()) } @@ -254,12 +255,14 @@ impl Input { // a new buffer for each one of the resolved arguments. let mut calldata = Vec::::with_capacity(4 + self.calldata.size_requirement()); calldata.extend(function.selector().0); - self.calldata.calldata_into_slice( - &mut calldata, - deployed_contracts, - variables, - chain_state_provider, - )?; + self.calldata + .calldata_into_slice( + &mut calldata, + deployed_contracts, + variables, + chain_state_provider, + ) + .await?; Ok(calldata.into()) } @@ -267,13 +270,15 @@ impl Input { } /// Parse this input into a legacy transaction. - pub fn legacy_transaction<'a>( + pub async fn legacy_transaction<'a>( &'a self, deployed_contracts: &HashMap, variables: impl Into>> + Clone, chain_state_provider: &impl ResolverApi, ) -> anyhow::Result { - let input_data = self.encoded_input(deployed_contracts, variables, chain_state_provider)?; + let input_data = self + .encoded_input(deployed_contracts, variables, chain_state_provider) + .await?; let transaction_request = TransactionRequest::default().from(self.caller).value( self.value .map(|value| value.into_inner()) @@ -351,7 +356,7 @@ impl Calldata { } } - pub fn calldata<'a>( + pub async fn calldata<'a>( &'a self, deployed_contracts: &HashMap, variables: impl Into>> + Clone, @@ -363,11 +368,12 @@ impl Calldata { deployed_contracts, variables, chain_state_provider, - )?; + ) + .await?; Ok(buffer) } - pub fn calldata_into_slice<'a>( + pub async fn calldata_into_slice<'a>( &'a self, buffer: &mut Vec, deployed_contracts: &HashMap, @@ -380,7 +386,10 @@ impl Calldata { } Calldata::Compound(items) => { for (arg_idx, arg) in items.iter().enumerate() { - match arg.resolve(deployed_contracts, variables.clone(), chain_state_provider) { + match arg + .resolve(deployed_contracts, variables.clone(), chain_state_provider) + .await + { Ok(resolved) => { buffer.extend(resolved.to_be_bytes::<32>()); } @@ -403,7 +412,7 @@ impl Calldata { } /// Checks if this [`Calldata`] is equivalent to the passed calldata bytes. - pub fn is_equivalent<'a>( + pub async fn is_equivalent<'a>( &'a self, other: &[u8], deployed_contracts: &HashMap, @@ -430,8 +439,9 @@ impl Calldata { std::borrow::Cow::Borrowed(other) }; - let this = - this.resolve(deployed_contracts, variables.clone(), chain_state_provider)?; + let this = this + .resolve(deployed_contracts, variables.clone(), chain_state_provider) + .await?; let other = U256::from_be_slice(&other); if this != other { return Ok(false); @@ -444,7 +454,7 @@ impl Calldata { } impl CalldataItem { - fn resolve<'a>( + async fn resolve<'a>( &'a self, deployed_contracts: &HashMap, variables: impl Into>> + Clone, @@ -456,7 +466,7 @@ impl CalldataItem { .calldata_tokens() .map(|token| token.resolve(deployed_contracts, variables.clone(), chain_state_provider)) { - let token = token?; + let token = token.await?; let new_token = match token { CalldataToken::Item(_) => token, CalldataToken::Operation(operation) => { @@ -555,7 +565,7 @@ impl> CalldataToken { /// This piece of code is taken from the matter-labs-tester repository which is licensed under /// MIT or Apache. The original source code can be found here: /// https://github.com/matter-labs/era-compiler-tester/blob/0ed598a27f6eceee7008deab3ff2311075a2ec69/compiler_tester/src/test/case/input/value.rs#L43-L146 - fn resolve<'a>( + async fn resolve<'a>( self, deployed_contracts: &HashMap, variables: impl Into>> + Clone, @@ -589,18 +599,22 @@ impl> CalldataToken { anyhow::anyhow!("Invalid hexadecimal literal: {}", error) })?) } else if item == Self::CHAIN_VARIABLE { - let chain_id = chain_state_provider.chain_id()?; + let chain_id = chain_state_provider.chain_id().await?; Ok(U256::from(chain_id)) } else if item == Self::GAS_LIMIT_VARIABLE { - let gas_limit = - chain_state_provider.block_gas_limit(BlockNumberOrTag::Latest)?; + let gas_limit = chain_state_provider + .block_gas_limit(BlockNumberOrTag::Latest) + .await?; Ok(U256::from(gas_limit)) } else if item == Self::COINBASE_VARIABLE { - let coinbase = chain_state_provider.block_coinbase(BlockNumberOrTag::Latest)?; + let coinbase = chain_state_provider + .block_coinbase(BlockNumberOrTag::Latest) + .await?; Ok(U256::from_be_slice(coinbase.as_ref())) } else if item == Self::DIFFICULTY_VARIABLE { - let block_difficulty = - chain_state_provider.block_difficulty(BlockNumberOrTag::Latest)?; + let block_difficulty = chain_state_provider + .block_difficulty(BlockNumberOrTag::Latest) + .await?; Ok(block_difficulty) } else if item.starts_with(Self::BLOCK_HASH_VARIABLE_PREFIX) { let offset: u64 = item @@ -609,19 +623,21 @@ impl> CalldataToken { .and_then(|value| value.parse().ok()) .unwrap_or_default(); - let current_block_number = chain_state_provider.last_block_number()?; + let current_block_number = chain_state_provider.last_block_number().await?; let desired_block_number = current_block_number - offset; - let block_hash = - chain_state_provider.block_hash(desired_block_number.into())?; + let block_hash = chain_state_provider + .block_hash(desired_block_number.into()) + .await?; Ok(U256::from_be_bytes(block_hash.0)) } else if item == Self::BLOCK_NUMBER_VARIABLE { - let current_block_number = chain_state_provider.last_block_number()?; + let current_block_number = chain_state_provider.last_block_number().await?; Ok(U256::from(current_block_number)) } else if item == Self::BLOCK_TIMESTAMP_VARIABLE { - let timestamp = - chain_state_provider.block_timestamp(BlockNumberOrTag::Latest)?; + let timestamp = chain_state_provider + .block_timestamp(BlockNumberOrTag::Latest) + .await?; Ok(U256::from(timestamp)) } else if let Some(variable_name) = item.strip_prefix(Self::VARIABLE_PREFIX) { let Some(variables) = variables.into() else { @@ -682,43 +698,46 @@ mod tests { struct MockResolver; impl ResolverApi for MockResolver { - fn chain_id(&self) -> anyhow::Result { + async fn chain_id(&self) -> anyhow::Result { Ok(0x123) } - fn block_gas_limit(&self, _: alloy::eips::BlockNumberOrTag) -> anyhow::Result { + async fn block_gas_limit(&self, _: alloy::eips::BlockNumberOrTag) -> anyhow::Result { Ok(0x1234) } - fn block_coinbase(&self, _: alloy::eips::BlockNumberOrTag) -> anyhow::Result

{ + async fn block_coinbase( + &self, + _: alloy::eips::BlockNumberOrTag, + ) -> anyhow::Result
{ Ok(Address::ZERO) } - fn block_difficulty(&self, _: alloy::eips::BlockNumberOrTag) -> anyhow::Result { + async fn block_difficulty(&self, _: alloy::eips::BlockNumberOrTag) -> anyhow::Result { Ok(U256::from(0x12345u128)) } - fn block_hash( + async fn block_hash( &self, _: alloy::eips::BlockNumberOrTag, ) -> anyhow::Result { Ok([0xEE; 32].into()) } - fn block_timestamp( + async fn block_timestamp( &self, _: alloy::eips::BlockNumberOrTag, ) -> anyhow::Result { Ok(0x123456) } - fn last_block_number(&self) -> anyhow::Result { + async fn last_block_number(&self) -> anyhow::Result { Ok(0x1234567) } } - #[test] - fn test_encoded_input_uint256() { + #[tokio::test] + async fn test_encoded_input_uint256() { let raw_metadata = r#" [ { @@ -755,6 +774,7 @@ mod tests { let encoded = input .encoded_input(&contracts, None, &MockResolver) + .await .unwrap(); assert!(encoded.0.starts_with(&selector)); @@ -763,8 +783,8 @@ mod tests { assert_eq!(decoded.0, 42); } - #[test] - fn test_encoded_input_address_with_signature() { + #[tokio::test] + async fn test_encoded_input_address_with_signature() { let raw_abi = r#"[ { "inputs": [{"name": "recipient", "type": "address"}], @@ -799,6 +819,7 @@ mod tests { let encoded = input .encoded_input(&contracts, None, &MockResolver) + .await .unwrap(); assert!(encoded.0.starts_with(&selector)); @@ -810,8 +831,8 @@ mod tests { ); } - #[test] - fn test_encoded_input_address() { + #[tokio::test] + async fn test_encoded_input_address() { let raw_abi = r#"[ { "inputs": [{"name": "recipient", "type": "address"}], @@ -846,6 +867,7 @@ mod tests { let encoded = input .encoded_input(&contracts, None, &MockResolver) + .await .unwrap(); assert!(encoded.0.starts_with(&selector)); @@ -857,50 +879,57 @@ mod tests { ); } - fn resolve_calldata_item( + async fn resolve_calldata_item( input: &str, deployed_contracts: &HashMap, chain_state_provider: &impl ResolverApi, ) -> anyhow::Result { - CalldataItem::new(input).resolve(deployed_contracts, None, chain_state_provider) + CalldataItem::new(input) + .resolve(deployed_contracts, None, chain_state_provider) + .await } - #[test] - fn resolver_can_resolve_chain_id_variable() { + #[tokio::test] + async fn resolver_can_resolve_chain_id_variable() { // Arrange let input = "$CHAIN_ID"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); - assert_eq!(resolved, U256::from(MockResolver.chain_id().unwrap())) + assert_eq!(resolved, U256::from(MockResolver.chain_id().await.unwrap())) } - #[test] - fn resolver_can_resolve_gas_limit_variable() { + #[tokio::test] + async fn resolver_can_resolve_gas_limit_variable() { // Arrange let input = "$GAS_LIMIT"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - U256::from(MockResolver.block_gas_limit(Default::default()).unwrap()) + U256::from( + MockResolver + .block_gas_limit(Default::default()) + .await + .unwrap() + ) ) } - #[test] - fn resolver_can_resolve_coinbase_variable() { + #[tokio::test] + async fn resolver_can_resolve_coinbase_variable() { // Arrange let input = "$COINBASE"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); @@ -909,163 +938,172 @@ mod tests { U256::from_be_slice( MockResolver .block_coinbase(Default::default()) + .await .unwrap() .as_ref() ) ) } - #[test] - fn resolver_can_resolve_block_difficulty_variable() { + #[tokio::test] + async fn resolver_can_resolve_block_difficulty_variable() { // Arrange let input = "$DIFFICULTY"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - MockResolver.block_difficulty(Default::default()).unwrap() + MockResolver + .block_difficulty(Default::default()) + .await + .unwrap() ) } - #[test] - fn resolver_can_resolve_block_hash_variable() { + #[tokio::test] + async fn resolver_can_resolve_block_hash_variable() { // Arrange let input = "$BLOCK_HASH"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - U256::from_be_bytes(MockResolver.block_hash(Default::default()).unwrap().0) + U256::from_be_bytes(MockResolver.block_hash(Default::default()).await.unwrap().0) ) } - #[test] - fn resolver_can_resolve_block_number_variable() { + #[tokio::test] + async fn resolver_can_resolve_block_number_variable() { // Arrange let input = "$BLOCK_NUMBER"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - U256::from(MockResolver.last_block_number().unwrap()) + U256::from(MockResolver.last_block_number().await.unwrap()) ) } - #[test] - fn resolver_can_resolve_block_timestamp_variable() { + #[tokio::test] + async fn resolver_can_resolve_block_timestamp_variable() { // Arrange let input = "$BLOCK_TIMESTAMP"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - U256::from(MockResolver.block_timestamp(Default::default()).unwrap()) + U256::from( + MockResolver + .block_timestamp(Default::default()) + .await + .unwrap() + ) ) } - #[test] - fn simple_addition_can_be_resolved() { + #[tokio::test] + async fn simple_addition_can_be_resolved() { // Arrange let input = "2 4 +"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!(resolved, U256::from(6)); } - #[test] - fn simple_subtraction_can_be_resolved() { + #[tokio::test] + async fn simple_subtraction_can_be_resolved() { // Arrange let input = "4 2 -"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!(resolved, U256::from(2)); } - #[test] - fn simple_multiplication_can_be_resolved() { + #[tokio::test] + async fn simple_multiplication_can_be_resolved() { // Arrange let input = "4 2 *"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!(resolved, U256::from(8)); } - #[test] - fn simple_division_can_be_resolved() { + #[tokio::test] + async fn simple_division_can_be_resolved() { // Arrange let input = "4 2 /"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!(resolved, U256::from(2)); } - #[test] - fn arithmetic_errors_are_not_panics() { + #[tokio::test] + async fn arithmetic_errors_are_not_panics() { // Arrange let input = "4 0 /"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert assert!(resolved.is_err()) } - #[test] - fn arithmetic_with_resolution_works() { + #[tokio::test] + async fn arithmetic_with_resolution_works() { // Arrange let input = "$BLOCK_NUMBER 10 +"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert let resolved = resolved.expect("Failed to resolve argument"); assert_eq!( resolved, - U256::from(MockResolver.last_block_number().unwrap() + 10) + U256::from(MockResolver.last_block_number().await.unwrap() + 10) ); } - #[test] - fn incorrect_number_of_arguments_errors() { + #[tokio::test] + async fn incorrect_number_of_arguments_errors() { // Arrange let input = "$BLOCK_NUMBER 10 + +"; // Act - let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver); + let resolved = resolve_calldata_item(input, &Default::default(), &MockResolver).await; // Assert assert!(resolved.is_err()) diff --git a/crates/format/src/traits.rs b/crates/format/src/traits.rs index c22f24a..3d302f9 100644 --- a/crates/format/src/traits.rs +++ b/crates/format/src/traits.rs @@ -6,25 +6,28 @@ use anyhow::Result; /// crate implements to go from string calldata and into the bytes calldata. pub trait ResolverApi { /// Returns the ID of the chain that the node is on. - fn chain_id(&self) -> Result; + fn chain_id(&self) -> impl Future>; // TODO: This is currently a u128 due to Kitchensink needing more than 64 bits for its gas limit // when we implement the changes to the gas we need to adjust this to be a u64. /// Returns the gas limit of the specified block. - fn block_gas_limit(&self, number: BlockNumberOrTag) -> Result; + fn block_gas_limit(&self, number: BlockNumberOrTag) -> impl Future>; /// Returns the coinbase of the specified block. - fn block_coinbase(&self, number: BlockNumberOrTag) -> Result
; + fn block_coinbase(&self, number: BlockNumberOrTag) -> impl Future>; /// Returns the difficulty of the specified block. - fn block_difficulty(&self, number: BlockNumberOrTag) -> Result; + fn block_difficulty(&self, number: BlockNumberOrTag) -> impl Future>; /// Returns the hash of the specified block. - fn block_hash(&self, number: BlockNumberOrTag) -> Result; + fn block_hash(&self, number: BlockNumberOrTag) -> impl Future>; /// Returns the timestamp of the specified block, - fn block_timestamp(&self, number: BlockNumberOrTag) -> Result; + fn block_timestamp( + &self, + number: BlockNumberOrTag, + ) -> impl Future>; /// Returns the number of the last block. - fn last_block_number(&self) -> Result; + fn last_block_number(&self) -> impl Future>; } diff --git a/crates/node-interaction/src/lib.rs b/crates/node-interaction/src/lib.rs index 130d804..791ba4b 100644 --- a/crates/node-interaction/src/lib.rs +++ b/crates/node-interaction/src/lib.rs @@ -7,15 +7,18 @@ use anyhow::Result; /// An interface for all interactions with Ethereum compatible nodes. pub trait EthereumNode { /// Execute the [TransactionRequest] and return a [TransactionReceipt]. - fn execute_transaction(&self, transaction: TransactionRequest) -> Result; + fn execute_transaction( + &self, + transaction: TransactionRequest, + ) -> impl Future>; /// Trace the transaction in the [TransactionReceipt] and return a [GethTrace]. fn trace_transaction( &self, receipt: &TransactionReceipt, trace_options: GethDebugTracingOptions, - ) -> Result; + ) -> impl Future>; /// Returns the state diff of the transaction hash in the [TransactionReceipt]. - fn state_diff(&self, receipt: &TransactionReceipt) -> Result; + fn state_diff(&self, receipt: &TransactionReceipt) -> impl Future>; } diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index 9795893..6dc008c 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -25,7 +25,6 @@ use alloy::{ }, signers::local::PrivateKeySigner, }; -use revive_dt_common::concepts::BlockingExecutor; use revive_dt_config::Arguments; use revive_dt_format::traits::ResolverApi; use revive_dt_node_interaction::EthereumNode; @@ -246,106 +245,102 @@ impl Instance { impl EthereumNode for Instance { #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn execute_transaction( + async fn execute_transaction( &self, transaction: TransactionRequest, ) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - let outer_span = tracing::debug_span!("Submitting transaction", ?transaction); - let _outer_guard = outer_span.enter(); + let outer_span = tracing::debug_span!("Submitting transaction", ?transaction); + let _outer_guard = outer_span.enter(); - let provider = provider.await?; + let provider = self.provider().await?; - let pending_transaction = provider.send_transaction(transaction).await?; - let transaction_hash = pending_transaction.tx_hash(); + let pending_transaction = provider.send_transaction(transaction).await?; + let transaction_hash = pending_transaction.tx_hash(); - let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash); - let _guard = span.enter(); + let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash); + let _guard = span.enter(); - // The following is a fix for the "transaction indexing is in progress" error that we - // used to get. You can find more information on this in the following GH issue in geth - // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, - // before we can get the receipt of the transaction it needs to have been indexed by the - // node's indexer. Just because the transaction has been confirmed it doesn't mean that - // it has been indexed. When we call alloy's `get_receipt` it checks if the transaction - // was confirmed. If it has been, then it will call `eth_getTransactionReceipt` method - // which _might_ return the above error if the tx has not yet been indexed yet. So, we - // need to implement a retry mechanism for the receipt to keep retrying to get it until - // it eventually works, but we only do that if the error we get back is the "transaction - // indexing is in progress" error or if the receipt is None. - // - // Getting the transaction indexed and taking a receipt can take a long time especially - // when a lot of transactions are being submitted to the node. Thus, while initially we - // only allowed for 60 seconds of waiting with a 1 second delay in polling, we need to - // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting - // with exponential backoff each time we attempt to get the receipt and find that it's - // not available. - let mut retries = 0; - let mut total_wait_duration = Duration::from_secs(0); - let max_allowed_wait_duration = Duration::from_secs(5 * 60); - loop { - if total_wait_duration >= max_allowed_wait_duration { - tracing::error!( - ?total_wait_duration, - ?max_allowed_wait_duration, - retry_count = retries, - "Failed to get receipt after polling for it" - ); - anyhow::bail!( - "Polled for receipt for {total_wait_duration:?} but failed to get it" - ); - } - - match provider.get_transaction_receipt(*transaction_hash).await { - Ok(Some(receipt)) => { - tracing::info!(?total_wait_duration, "Found receipt"); - break Ok(receipt); - } - Ok(None) => {} - Err(error) => { - let error_string = error.to_string(); - if !error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { - break Err(error.into()); - } - } - }; - - let next_wait_duration = Duration::from_secs(2u64.pow(retries)) - .min(max_allowed_wait_duration - total_wait_duration); - total_wait_duration += next_wait_duration; - retries += 1; - - tokio::time::sleep(next_wait_duration).await; + // The following is a fix for the "transaction indexing is in progress" error that we + // used to get. You can find more information on this in the following GH issue in geth + // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, + // before we can get the receipt of the transaction it needs to have been indexed by the + // node's indexer. Just because the transaction has been confirmed it doesn't mean that + // it has been indexed. When we call alloy's `get_receipt` it checks if the transaction + // was confirmed. If it has been, then it will call `eth_getTransactionReceipt` method + // which _might_ return the above error if the tx has not yet been indexed yet. So, we + // need to implement a retry mechanism for the receipt to keep retrying to get it until + // it eventually works, but we only do that if the error we get back is the "transaction + // indexing is in progress" error or if the receipt is None. + // + // Getting the transaction indexed and taking a receipt can take a long time especially + // when a lot of transactions are being submitted to the node. Thus, while initially we + // only allowed for 60 seconds of waiting with a 1 second delay in polling, we need to + // allow for a larger wait time. Therefore, in here we allow for 5 minutes of waiting + // with exponential backoff each time we attempt to get the receipt and find that it's + // not available. + let mut retries = 0; + let mut total_wait_duration = Duration::from_secs(0); + let max_allowed_wait_duration = Duration::from_secs(5 * 60); + loop { + if total_wait_duration >= max_allowed_wait_duration { + tracing::error!( + ?total_wait_duration, + ?max_allowed_wait_duration, + retry_count = retries, + "Failed to get receipt after polling for it" + ); + anyhow::bail!( + "Polled for receipt for {total_wait_duration:?} but failed to get it" + ); } - })? + + match provider.get_transaction_receipt(*transaction_hash).await { + Ok(Some(receipt)) => { + tracing::info!(?total_wait_duration, "Found receipt"); + break Ok(receipt); + } + Ok(None) => {} + Err(error) => { + let error_string = error.to_string(); + if !error_string.contains(Self::TRANSACTION_INDEXING_ERROR) { + break Err(error.into()); + } + } + }; + + let next_wait_duration = Duration::from_secs(2u64.pow(retries)) + .min(max_allowed_wait_duration - total_wait_duration); + total_wait_duration += next_wait_duration; + retries += 1; + + tokio::time::sleep(next_wait_duration).await; + } } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn trace_transaction( + async fn trace_transaction( &self, transaction: &TransactionReceipt, trace_options: GethDebugTracingOptions, ) -> anyhow::Result { let tx_hash = transaction.transaction_hash; - let provider = self.provider(); - BlockingExecutor::execute(async move { - Ok(provider - .await? - .debug_trace_transaction(tx_hash, trace_options) - .await?) - })? + Ok(self + .provider() + .await? + .debug_trace_transaction(tx_hash, trace_options) + .await?) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { + async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { diff_mode: Some(true), disable_code: None, disable_storage: None, }); match self - .trace_transaction(transaction, trace_options)? + .trace_transaction(transaction, trace_options) + .await? .try_into_pre_state_frame()? { PreStateFrame::Diff(diff) => Ok(diff), @@ -356,84 +351,71 @@ impl EthereumNode for Instance { impl ResolverApi for Instance { #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn chain_id(&self) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider.await?.get_chain_id().await.map_err(Into::into) - })? + async fn chain_id(&self) -> anyhow::Result { + self.provider() + .await? + .get_chain_id() + .await + .map_err(Into::into) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_gas_limit(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.gas_limit as _) - })? + async fn block_gas_limit(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.gas_limit as _) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_coinbase(&self, number: BlockNumberOrTag) -> anyhow::Result
{ - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.beneficiary) - })? + async fn block_coinbase(&self, number: BlockNumberOrTag) -> anyhow::Result
{ + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.beneficiary) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_difficulty(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.difficulty) - })? + async fn block_difficulty(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.difficulty) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_hash(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.hash) - })? + async fn block_hash(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.hash) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_timestamp(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.timestamp) - })? + async fn block_timestamp(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.timestamp) } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn last_block_number(&self) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider.await?.get_block_number().await.map_err(Into::into) - })? + async fn last_block_number(&self) -> anyhow::Result { + self.provider() + .await? + .get_block_number() + .await + .map_err(Into::into) } } @@ -583,89 +565,89 @@ mod tests { ); } - #[test] - fn can_get_chain_id_from_node() { + #[tokio::test] + async fn can_get_chain_id_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let chain_id = node.chain_id(); + let chain_id = node.chain_id().await; // Assert let chain_id = chain_id.expect("Failed to get the chain id"); assert_eq!(chain_id, 420_420_420); } - #[test] - fn can_get_gas_limit_from_node() { + #[tokio::test] + async fn can_get_gas_limit_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let gas_limit = node.block_gas_limit(BlockNumberOrTag::Latest); + let gas_limit = node.block_gas_limit(BlockNumberOrTag::Latest).await; // Assert let gas_limit = gas_limit.expect("Failed to get the gas limit"); assert_eq!(gas_limit, u32::MAX as u128) } - #[test] - fn can_get_coinbase_from_node() { + #[tokio::test] + async fn can_get_coinbase_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let coinbase = node.block_coinbase(BlockNumberOrTag::Latest); + let coinbase = node.block_coinbase(BlockNumberOrTag::Latest).await; // Assert let coinbase = coinbase.expect("Failed to get the coinbase"); assert_eq!(coinbase, Address::new([0xFF; 20])) } - #[test] - fn can_get_block_difficulty_from_node() { + #[tokio::test] + async fn can_get_block_difficulty_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let block_difficulty = node.block_difficulty(BlockNumberOrTag::Latest); + let block_difficulty = node.block_difficulty(BlockNumberOrTag::Latest).await; // Assert let block_difficulty = block_difficulty.expect("Failed to get the block difficulty"); assert_eq!(block_difficulty, U256::ZERO) } - #[test] - fn can_get_block_hash_from_node() { + #[tokio::test] + async fn can_get_block_hash_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let block_hash = node.block_hash(BlockNumberOrTag::Latest); + let block_hash = node.block_hash(BlockNumberOrTag::Latest).await; // Assert let _ = block_hash.expect("Failed to get the block hash"); } - #[test] - fn can_get_block_timestamp_from_node() { + #[tokio::test] + async fn can_get_block_timestamp_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let block_timestamp = node.block_timestamp(BlockNumberOrTag::Latest); + let block_timestamp = node.block_timestamp(BlockNumberOrTag::Latest).await; // Assert let _ = block_timestamp.expect("Failed to get the block timestamp"); } - #[test] - fn can_get_block_number_from_node() { + #[tokio::test] + async fn can_get_block_number_from_node() { // Arrange let (node, _temp_dir) = new_node(); // Act - let block_number = node.last_block_number(); + let block_number = node.last_block_number().await; // Assert let block_number = block_number.expect("Failed to get the block number"); diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index ff9d652..f9faa2a 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -37,7 +37,6 @@ use sp_core::crypto::Ss58Codec; use sp_runtime::AccountId32; use tracing::Level; -use revive_dt_common::concepts::BlockingExecutor; use revive_dt_config::Arguments; use revive_dt_node_interaction::EthereumNode; @@ -377,49 +376,46 @@ impl KitchensinkNode { impl EthereumNode for KitchensinkNode { #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] - fn execute_transaction( + async fn execute_transaction( &self, transaction: alloy::rpc::types::TransactionRequest, ) -> anyhow::Result { tracing::debug!(?transaction, "Submitting transaction"); - let provider = self.provider(); - let receipt = BlockingExecutor::execute(async move { - Ok(provider - .await? - .send_transaction(transaction) - .await? - .get_receipt() - .await?) - })?; + let receipt = self + .provider() + .await? + .send_transaction(transaction) + .await? + .get_receipt() + .await?; tracing::info!(?receipt, "Submitted tx to kitchensink"); - receipt + Ok(receipt) } #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] - fn trace_transaction( + async fn trace_transaction( &self, transaction: &TransactionReceipt, trace_options: GethDebugTracingOptions, ) -> anyhow::Result { let tx_hash = transaction.transaction_hash; - let provider = self.provider(); - BlockingExecutor::execute(async move { - Ok(provider - .await? - .debug_trace_transaction(tx_hash, trace_options) - .await?) - })? + Ok(self + .provider() + .await? + .debug_trace_transaction(tx_hash, trace_options) + .await?) } #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] - fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { + async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result { let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig { diff_mode: Some(true), disable_code: None, disable_storage: None, }); match self - .trace_transaction(transaction, trace_options)? + .trace_transaction(transaction, trace_options) + .await? .try_into_pre_state_frame()? { PreStateFrame::Diff(diff) => Ok(diff), @@ -429,85 +425,72 @@ impl EthereumNode for KitchensinkNode { } impl ResolverApi for KitchensinkNode { - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn chain_id(&self) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider.await?.get_chain_id().await.map_err(Into::into) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn chain_id(&self) -> anyhow::Result { + self.provider() + .await? + .get_chain_id() + .await + .map_err(Into::into) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_gas_limit(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.gas_limit) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn block_gas_limit(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.gas_limit as _) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_coinbase(&self, number: BlockNumberOrTag) -> anyhow::Result
{ - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.beneficiary) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn block_coinbase(&self, number: BlockNumberOrTag) -> anyhow::Result
{ + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.beneficiary) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_difficulty(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.difficulty) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn block_difficulty(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.difficulty) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_hash(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.hash) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn block_hash(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.hash) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn block_timestamp(&self, number: BlockNumberOrTag) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider - .await? - .get_block_by_number(number) - .await? - .ok_or(anyhow::Error::msg("Blockchain has no blocks")) - .map(|block| block.header.timestamp) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn block_timestamp(&self, number: BlockNumberOrTag) -> anyhow::Result { + self.provider() + .await? + .get_block_by_number(number) + .await? + .ok_or(anyhow::Error::msg("Blockchain has no blocks")) + .map(|block| block.header.timestamp) } - #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] - fn last_block_number(&self) -> anyhow::Result { - let provider = self.provider(); - BlockingExecutor::execute(async move { - provider.await?.get_block_number().await.map_err(Into::into) - })? + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + async fn last_block_number(&self) -> anyhow::Result { + self.provider() + .await? + .get_block_number() + .await + .map_err(Into::into) } } @@ -1284,86 +1267,86 @@ mod tests { ); } - #[test] - fn can_get_chain_id_from_node() { + #[tokio::test] + async fn can_get_chain_id_from_node() { // Arrange let node = shared_node(); // Act - let chain_id = node.chain_id(); + let chain_id = node.chain_id().await; // Assert let chain_id = chain_id.expect("Failed to get the chain id"); assert_eq!(chain_id, 420_420_420); } - #[test] - fn can_get_gas_limit_from_node() { + #[tokio::test] + async fn can_get_gas_limit_from_node() { // Arrange let node = shared_node(); // Act - let gas_limit = node.block_gas_limit(BlockNumberOrTag::Latest); + let gas_limit = node.block_gas_limit(BlockNumberOrTag::Latest).await; // Assert let _ = gas_limit.expect("Failed to get the gas limit"); } - #[test] - fn can_get_coinbase_from_node() { + #[tokio::test] + async fn can_get_coinbase_from_node() { // Arrange let node = shared_node(); // Act - let coinbase = node.block_coinbase(BlockNumberOrTag::Latest); + let coinbase = node.block_coinbase(BlockNumberOrTag::Latest).await; // Assert let _ = coinbase.expect("Failed to get the coinbase"); } - #[test] - fn can_get_block_difficulty_from_node() { + #[tokio::test] + async fn can_get_block_difficulty_from_node() { // Arrange let node = shared_node(); // Act - let block_difficulty = node.block_difficulty(BlockNumberOrTag::Latest); + let block_difficulty = node.block_difficulty(BlockNumberOrTag::Latest).await; // Assert let _ = block_difficulty.expect("Failed to get the block difficulty"); } - #[test] - fn can_get_block_hash_from_node() { + #[tokio::test] + async fn can_get_block_hash_from_node() { // Arrange let node = shared_node(); // Act - let block_hash = node.block_hash(BlockNumberOrTag::Latest); + let block_hash = node.block_hash(BlockNumberOrTag::Latest).await; // Assert let _ = block_hash.expect("Failed to get the block hash"); } - #[test] - fn can_get_block_timestamp_from_node() { + #[tokio::test] + async fn can_get_block_timestamp_from_node() { // Arrange let node = shared_node(); // Act - let block_timestamp = node.block_timestamp(BlockNumberOrTag::Latest); + let block_timestamp = node.block_timestamp(BlockNumberOrTag::Latest).await; // Assert let _ = block_timestamp.expect("Failed to get the block timestamp"); } - #[test] - fn can_get_block_number_from_node() { + #[tokio::test] + async fn can_get_block_number_from_node() { // Arrange let node = shared_node(); // Act - let block_number = node.last_block_number(); + let block_number = node.last_block_number().await; // Assert let _ = block_number.expect("Failed to get the block number");