Make codebase async

This commit is contained in:
Omar Abdulla
2025-08-01 11:11:50 +03:00
parent 11bba5add5
commit 390402b6cc
18 changed files with 679 additions and 794 deletions
+94 -71
View File
@@ -67,24 +67,31 @@ where
}
}
pub fn handle_input(
pub async fn handle_input(
&mut self,
metadata: &Metadata,
case_idx: CaseIdx,
input: &Input,
node: &T::Blockchain,
) -> anyhow::Result<(TransactionReceipt, GethTrace, DiffMode)> {
let deployment_receipts =
self.handle_contract_deployment(metadata, case_idx, input, node)?;
let execution_receipt = self.handle_input_execution(input, deployment_receipts, node)?;
let tracing_result = self.handle_input_call_frame_tracing(&execution_receipt, node)?;
let deployment_receipts = self
.handle_contract_deployment(metadata, case_idx, input, node)
.await?;
let execution_receipt = self
.handle_input_execution(input, deployment_receipts, node)
.await?;
let tracing_result = self
.handle_input_call_frame_tracing(&execution_receipt, node)
.await?;
self.handle_input_variable_assignment(input, &tracing_result)?;
self.handle_input_expectations(input, &execution_receipt, node, &tracing_result)?;
self.handle_input_expectations(input, &execution_receipt, node, &tracing_result)
.await?;
self.handle_input_diff(case_idx, execution_receipt, node)
.await
}
/// Handles the contract deployment for a given input performing it if it needs to be performed.
fn handle_contract_deployment(
async fn handle_contract_deployment(
&mut self,
metadata: &Metadata,
case_idx: CaseIdx,
@@ -121,14 +128,17 @@ where
.then_some(input.value)
.flatten();
if let (_, _, Some(receipt)) = self.get_or_deploy_contract_instance(
&instance,
metadata,
input.caller,
calldata,
value,
node,
)? {
if let (_, _, Some(receipt)) = self
.get_or_deploy_contract_instance(
&instance,
metadata,
input.caller,
calldata,
value,
node,
)
.await?
{
receipts.insert(instance.clone(), receipt);
}
}
@@ -137,7 +147,7 @@ where
}
/// Handles the execution of the input in terms of the calls that need to be made.
fn handle_input_execution(
async fn handle_input_execution(
&mut self,
input: &Input,
mut deployment_receipts: HashMap<ContractInstance, TransactionReceipt>,
@@ -150,22 +160,23 @@ where
.remove(&input.instance)
.context("Failed to find deployment receipt"),
Method::Fallback | Method::FunctionName(_) => {
let tx =
match input.legacy_transaction(&self.deployed_contracts, &self.variables, node)
{
Ok(tx) => {
tracing::debug!("Legacy transaction data: {tx:#?}");
tx
}
Err(err) => {
tracing::error!("Failed to construct legacy transaction: {err:?}");
return Err(err);
}
};
let tx = match input
.legacy_transaction(&self.deployed_contracts, &self.variables, node)
.await
{
Ok(tx) => {
tracing::debug!("Legacy transaction data: {tx:#?}");
tx
}
Err(err) => {
tracing::error!("Failed to construct legacy transaction: {err:?}");
return Err(err);
}
};
tracing::trace!("Executing transaction for input: {input:?}");
match node.execute_transaction(tx) {
match node.execute_transaction(tx).await {
Ok(receipt) => Ok(receipt),
Err(err) => {
tracing::error!(
@@ -180,7 +191,7 @@ where
}
}
fn handle_input_call_frame_tracing(
async fn handle_input_call_frame_tracing(
&self,
execution_receipt: &TransactionReceipt,
node: &T::Blockchain,
@@ -194,6 +205,7 @@ where
..Default::default()
},
)
.await
.map(|trace| {
trace
.try_into_call_frame()
@@ -226,7 +238,7 @@ where
Ok(())
}
fn handle_input_expectations(
async fn handle_input_expectations(
&mut self,
input: &Input,
execution_receipt: &TransactionReceipt,
@@ -270,13 +282,14 @@ where
node,
expectation,
tracing_result,
)?;
)
.await?;
}
Ok(())
}
fn handle_input_expectation_item(
async fn handle_input_expectation_item(
&mut self,
execution_receipt: &TransactionReceipt,
node: &T::Blockchain,
@@ -313,12 +326,15 @@ where
if let Some(ref expected_calldata) = expectation.return_data {
let expected = expected_calldata;
let actual = &tracing_result.output.as_ref().unwrap_or_default();
if !expected.is_equivalent(
actual,
deployed_contracts,
&*variables,
chain_state_provider,
)? {
if !expected
.is_equivalent(
actual,
deployed_contracts,
&*variables,
chain_state_provider,
)
.await?
{
tracing::error!(
?execution_receipt,
?expected,
@@ -349,7 +365,8 @@ where
if let Some(ref expected_address) = expected_event.address {
let expected = Address::from_slice(
Calldata::new_compound([expected_address])
.calldata(deployed_contracts, &*variables, node)?
.calldata(deployed_contracts, &*variables, node)
.await?
.get(12..32)
.expect("Can't fail"),
);
@@ -374,12 +391,15 @@ where
.zip(actual_event.topics())
{
let expected = Calldata::new_compound([expected]);
if !expected.is_equivalent(
&actual.0,
deployed_contracts,
&*variables,
chain_state_provider,
)? {
if !expected
.is_equivalent(
&actual.0,
deployed_contracts,
&*variables,
chain_state_provider,
)
.await?
{
tracing::error!(
?execution_receipt,
?expected,
@@ -395,12 +415,15 @@ where
// Handling the values assertion.
let expected = &expected_event.values;
let actual = &actual_event.data().data;
if !expected.is_equivalent(
&actual.0,
deployed_contracts,
&*variables,
chain_state_provider,
)? {
if !expected
.is_equivalent(
&actual.0,
deployed_contracts,
&*variables,
chain_state_provider,
)
.await?
{
tracing::error!(
?execution_receipt,
?expected,
@@ -417,7 +440,7 @@ where
Ok(())
}
fn handle_input_diff(
async fn handle_input_diff(
&mut self,
_: CaseIdx,
execution_receipt: TransactionReceipt,
@@ -432,8 +455,10 @@ where
disable_storage: None,
});
let trace = node.trace_transaction(&execution_receipt, trace_options)?;
let diff = node.state_diff(&execution_receipt)?;
let trace = node
.trace_transaction(&execution_receipt, trace_options)
.await?;
let diff = node.state_diff(&execution_receipt).await?;
Ok((execution_receipt, trace, diff))
}
@@ -444,7 +469,7 @@ where
/// If a [`CaseIdx`] is not specified then this contact instance address will be stored in the
/// cross-case deployed contracts address mapping.
#[allow(clippy::too_many_arguments)]
pub fn get_or_deploy_contract_instance(
pub async fn get_or_deploy_contract_instance(
&mut self,
contract_instance: &ContractInstance,
metadata: &Metadata,
@@ -500,7 +525,9 @@ where
};
if let Some(calldata) = calldata {
let calldata = calldata.calldata(&self.deployed_contracts, None, node)?;
let calldata = calldata
.calldata(&self.deployed_contracts, None, node)
.await?;
code.extend(calldata);
}
@@ -513,7 +540,7 @@ where
TransactionBuilder::<Ethereum>::with_deploy_code(tx, code)
};
let receipt = match node.execute_transaction(tx) {
let receipt = match node.execute_transaction(tx).await {
Ok(receipt) => receipt,
Err(error) => {
tracing::error!(
@@ -604,7 +631,7 @@ where
}
}
pub fn execute(&mut self) -> anyhow::Result<usize> {
pub async fn execute(&mut self) -> anyhow::Result<usize> {
if !self
.leader_node
.matches_target(self.metadata.targets.as_deref())
@@ -624,18 +651,14 @@ where
let tracing_span = tracing::info_span!("Handling input", input_idx);
let _guard = tracing_span.enter();
let (leader_receipt, _, leader_diff) = self.leader_state.handle_input(
self.metadata,
self.case_idx,
&input,
self.leader_node,
)?;
let (follower_receipt, _, follower_diff) = self.follower_state.handle_input(
self.metadata,
self.case_idx,
&input,
self.follower_node,
)?;
let (leader_receipt, _, leader_diff) = self
.leader_state
.handle_input(self.metadata, self.case_idx, &input, self.leader_node)
.await?;
let (follower_receipt, _, follower_diff) = self
.follower_state
.handle_input(self.metadata, self.case_idx, &input, self.follower_node)
.await?;
if leader_diff == follower_diff {
tracing::debug!("State diffs match between leader and follower.");
+121 -94
View File
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::Path,
sync::{Arc, LazyLock, Mutex, RwLock},
sync::{Arc, LazyLock},
};
use alloy::{
@@ -12,12 +12,13 @@ use alloy::{
};
use anyhow::Context;
use clap::Parser;
use rayon::{ThreadPoolBuilder, prelude::*};
use futures::StreamExt;
use revive_dt_common::iterators::FilesWithExtensionIterator;
use revive_dt_node_interaction::EthereumNode;
use semver::Version;
use temp_dir::TempDir;
use tracing::Level;
use tokio::sync::{Mutex, RwLock};
use tracing::{Instrument, Level};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use revive_dt_compiler::SolidityCompiler;
@@ -51,18 +52,24 @@ type CompilationCache<'a> = Arc<
fn main() -> anyhow::Result<()> {
let args = init_cli()?;
for (corpus, tests) in collect_corpora(&args)? {
let span = Span::new(corpus, args.clone())?;
match &args.compile_only {
Some(platform) => compile_corpus(&args, &tests, platform, span),
None => execute_corpus(&args, &tests, span)?,
let body = async {
for (corpus, tests) in collect_corpora(&args)? {
let span = Span::new(corpus, args.clone())?;
match &args.compile_only {
Some(platform) => compile_corpus(&args, &tests, platform, span).await,
None => execute_corpus(&args, &tests, span).await?,
}
Report::save()?;
}
Ok(())
};
Report::save()?;
}
Ok(())
tokio::runtime::Builder::new_multi_thread()
.worker_threads(args.number_of_threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body)
}
fn init_cli() -> anyhow::Result<Arguments> {
@@ -93,10 +100,6 @@ fn init_cli() -> anyhow::Result<Arguments> {
}
tracing::info!("workdir: {}", args.directory().display());
ThreadPoolBuilder::new()
.num_threads(args.number_of_threads)
.build_global()?;
Ok(args)
}
@@ -114,7 +117,11 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<Metad
Ok(corpora)
}
fn run_driver<L, F>(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()>
async fn run_driver<L, F>(
args: &Arguments,
tests: &[MetadataFile],
span: Span,
) -> anyhow::Result<()>
where
L: Platform,
F: Platform,
@@ -146,42 +153,52 @@ where
.collect::<Vec<_>>();
let compilation_cache = Arc::new(RwLock::new(HashMap::new()));
test_cases.into_par_iter().for_each(
|(metadata_file_path, metadata, case_idx, case, solc_mode)| {
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = %metadata_file_path.display(),
case_idx = case_idx,
solc_mode = ?solc_mode,
);
let _guard = tracing_span.enter();
let result = handle_case_driver::<L, F>(
metadata_file_path.as_path(),
metadata,
case_idx.into(),
case,
solc_mode,
args,
compilation_cache.clone(),
leader_nodes.round_robbin(),
follower_nodes.round_robbin(),
span,
);
match result {
Ok(inputs_executed) => tracing::info!(inputs_executed, "Execution succeeded"),
Err(error) => tracing::info!(%error, "Execution failed"),
}
tracing::info!("Execution completed");
},
);
futures::stream::iter(test_cases)
.for_each_concurrent(
None,
|(metadata_file_path, metadata, case_idx, case, solc_mode)| {
let compilation_cache = compilation_cache.clone();
let leader_node = leader_nodes.round_robbin();
let follower_node = follower_nodes.round_robbin();
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = %metadata_file_path.display(),
case_idx = case_idx,
solc_mode = ?solc_mode,
);
async move {
let result = handle_case_driver::<L, F>(
metadata_file_path.as_path(),
metadata,
case_idx.into(),
case,
solc_mode,
args,
compilation_cache.clone(),
leader_node,
follower_node,
span,
)
.await;
match result {
Ok(inputs_executed) => {
tracing::info!(inputs_executed, "Execution succeeded")
}
Err(error) => tracing::info!(%error, "Execution failed"),
}
tracing::info!("Execution completed");
}
.instrument(tracing_span)
},
)
.await;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_case_driver<'a, L, F>(
async fn handle_case_driver<'a, L, F>(
metadata_file_path: &'a Path,
metadata: &'a Metadata,
case_idx: CaseIdx,
@@ -206,7 +223,8 @@ where
config,
compilation_cache.clone(),
&HashMap::new(),
)?;
)
.await?;
let follower_pre_link_contracts = get_or_build_contracts::<F>(
metadata,
metadata_file_path,
@@ -214,7 +232,8 @@ where
config,
compilation_cache.clone(),
&HashMap::new(),
)?;
)
.await?;
let mut leader_deployed_libraries = HashMap::new();
let mut follower_deployed_libraries = HashMap::new();
@@ -288,7 +307,7 @@ where
follower_code,
);
let leader_receipt = match leader_node.execute_transaction(leader_tx) {
let leader_receipt = match leader_node.execute_transaction(leader_tx).await {
Ok(receipt) => receipt,
Err(error) => {
tracing::error!(
@@ -299,7 +318,7 @@ where
return Err(error);
}
};
let follower_receipt = match follower_node.execute_transaction(follower_tx) {
let follower_receipt = match follower_node.execute_transaction(follower_tx).await {
Ok(receipt) => receipt,
Err(error) => {
tracing::error!(
@@ -349,7 +368,7 @@ where
let leader_key = (metadata_file_path, mode.clone(), L::config_id());
let follower_key = (metadata_file_path, mode.clone(), L::config_id());
{
let mut cache = compilation_cache.write().expect("Poisoned");
let mut cache = compilation_cache.write().await;
cache.remove(&leader_key);
cache.remove(&follower_key);
}
@@ -361,7 +380,8 @@ where
config,
compilation_cache.clone(),
&leader_deployed_libraries,
)?;
)
.await?;
let follower_post_link_contracts = get_or_build_contracts::<F>(
metadata,
metadata_file_path,
@@ -369,7 +389,8 @@ where
config,
compilation_cache,
&follower_deployed_libraries,
)?;
)
.await?;
(leader_post_link_contracts, follower_post_link_contracts)
} else {
@@ -396,10 +417,10 @@ where
leader_state,
follower_state,
);
driver.execute()
driver.execute().await
}
fn get_or_build_contracts<'a, P: Platform>(
async fn get_or_build_contracts<'a, P: Platform>(
metadata: &'a Metadata,
metadata_file_path: &'a Path,
mode: SolcMode,
@@ -408,13 +429,8 @@ fn get_or_build_contracts<'a, P: Platform>(
deployed_libraries: &HashMap<ContractInstance, (Address, JsonAbi)>,
) -> anyhow::Result<Arc<(Version, CompilerOutput)>> {
let key = (metadata_file_path, mode.clone(), P::config_id());
if let Some(compilation_artifact) = compilation_cache
.read()
.expect("Poisoned")
.get(&key)
.cloned()
{
let mut compilation_artifact = compilation_artifact.lock().expect("Poisoned");
if let Some(compilation_artifact) = compilation_cache.read().await.get(&key).cloned() {
let mut compilation_artifact = compilation_artifact.lock().await;
match *compilation_artifact {
Some(ref compiled_contracts) => {
tracing::debug!(?key, "Compiled contracts cache hit");
@@ -422,12 +438,9 @@ fn get_or_build_contracts<'a, P: Platform>(
}
None => {
tracing::debug!(?key, "Compiled contracts cache miss");
let compiled_contracts = Arc::new(compile_contracts::<P>(
metadata,
&mode,
config,
deployed_libraries,
)?);
let compiled_contracts = Arc::new(
compile_contracts::<P>(metadata, &mode, config, deployed_libraries).await?,
);
*compilation_artifact = Some(compiled_contracts.clone());
return Ok(compiled_contracts.clone());
}
@@ -436,23 +449,19 @@ fn get_or_build_contracts<'a, P: Platform>(
tracing::debug!(?key, "Compiled contracts cache miss");
let mutex = {
let mut compilation_cache = compilation_cache.write().expect("Poisoned");
let mut compilation_cache = compilation_cache.write().await;
let mutex = Arc::new(Mutex::new(None));
compilation_cache.insert(key, mutex.clone());
mutex
};
let mut compilation_artifact = mutex.lock().expect("Poisoned");
let compiled_contracts = Arc::new(compile_contracts::<P>(
metadata,
&mode,
config,
deployed_libraries,
)?);
let mut compilation_artifact = mutex.lock().await;
let compiled_contracts =
Arc::new(compile_contracts::<P>(metadata, &mode, config, deployed_libraries).await?);
*compilation_artifact = Some(compiled_contracts.clone());
Ok(compiled_contracts.clone())
}
fn compile_contracts<P: Platform>(
async fn compile_contracts<P: Platform>(
metadata: &Metadata,
mode: &SolcMode,
config: &Arguments,
@@ -489,18 +498,22 @@ fn compile_contracts<P: Platform>(
});
}
let compiler_output = compiler.try_build(compiler_path)?;
let compiler_output = compiler.try_build(compiler_path).await?;
Ok((compiler_version, compiler_output))
}
fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()> {
async fn execute_corpus(
args: &Arguments,
tests: &[MetadataFile],
span: Span,
) -> anyhow::Result<()> {
match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
run_driver::<Geth, Kitchensink>(args, tests, span)?
run_driver::<Geth, Kitchensink>(args, tests, span).await?
}
(TestingPlatform::Geth, TestingPlatform::Geth) => {
run_driver::<Geth, Geth>(args, tests, span)?
run_driver::<Geth, Geth>(args, tests, span).await?
}
_ => unimplemented!(),
}
@@ -508,27 +521,41 @@ fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyho
Ok(())
}
fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &TestingPlatform, _: Span) {
tests.par_iter().for_each(|metadata| {
for mode in &metadata.solc_modes() {
async fn compile_corpus(
config: &Arguments,
tests: &[MetadataFile],
platform: &TestingPlatform,
_: Span,
) {
let tests = tests.iter().flat_map(|metadata| {
metadata
.solc_modes()
.into_iter()
.map(move |solc_mode| (metadata, solc_mode))
});
futures::stream::iter(tests)
.for_each_concurrent(None, |(metadata, mode)| async move {
match platform {
TestingPlatform::Geth => {
let _ = compile_contracts::<Geth>(
&metadata.content,
mode,
&mode,
config,
&Default::default(),
);
)
.await;
}
TestingPlatform::Kitchensink => {
let _ = compile_contracts::<Geth>(
&metadata.content,
mode,
&mode,
config,
&Default::default(),
);
)
.await;
}
};
}
});
}
})
.await;
}