Parallelize Cases (#109)

* Parallelize over cases

* Rename the state and driver

* Parallelize execution

* Update the default config of the tool

* Make codebase async

* Fix machete

* Fix tests & clear node directories before startup

* Cleanup the cleanup logic

* Rename geth node
This commit is contained in:
Omar
2025-08-01 14:00:08 +03:00
committed by GitHub
parent 330a773a1c
commit 56c2fe8c0c
30 changed files with 1264 additions and 1534 deletions
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -26,7 +26,7 @@ pub trait Platform {
pub struct Geth;
impl Platform for Geth {
type Blockchain = geth::Instance;
type Blockchain = geth::GethNode;
type Compiler = solc::Solc;
fn config_id() -> TestingPlatform {
+461 -78
View File
@@ -1,37 +1,75 @@
use std::{collections::HashMap, sync::LazyLock};
use std::{
collections::HashMap,
path::Path,
sync::{Arc, LazyLock},
};
use alloy::{
json_abi::JsonAbi,
network::{Ethereum, TransactionBuilder},
primitives::Address,
rpc::types::TransactionRequest,
};
use anyhow::Context;
use clap::Parser;
use rayon::{ThreadPoolBuilder, prelude::*};
use futures::StreamExt;
use revive_dt_common::iterators::FilesWithExtensionIterator;
use revive_dt_node_interaction::EthereumNode;
use semver::Version;
use temp_dir::TempDir;
use tokio::sync::{Mutex, RwLock};
use tracing::{Instrument, Level};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use revive_dt_compiler::SolidityCompiler;
use revive_dt_compiler::{Compiler, CompilerOutput};
use revive_dt_config::*;
use revive_dt_core::{
Geth, Kitchensink, Platform,
driver::{Driver, State},
driver::{CaseDriver, CaseState},
};
use revive_dt_format::{
case::{Case, CaseIdx},
corpus::Corpus,
input::Input,
metadata::{ContractInstance, ContractPathAndIdent, Metadata, MetadataFile},
mode::SolcMode,
};
use revive_dt_format::{corpus::Corpus, metadata::MetadataFile};
use revive_dt_node::pool::NodePool;
use revive_dt_report::reporter::{Report, Span};
use temp_dir::TempDir;
use tracing::Level;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
static TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
type CompilationCache<'a> = Arc<
RwLock<
HashMap<
(&'a Path, SolcMode, TestingPlatform),
Arc<Mutex<Option<Arc<(Version, CompilerOutput)>>>>,
>,
>,
>;
fn main() -> anyhow::Result<()> {
let args = init_cli()?;
for (corpus, tests) in collect_corpora(&args)? {
let span = Span::new(corpus, args.clone())?;
match &args.compile_only {
Some(platform) => compile_corpus(&args, &tests, platform, span),
None => execute_corpus(&args, &tests, span)?,
let body = async {
for (corpus, tests) in collect_corpora(&args)? {
let span = Span::new(corpus, args.clone())?;
match &args.compile_only {
Some(platform) => compile_corpus(&args, &tests, platform, span).await,
None => execute_corpus(&args, &tests, span).await?,
}
Report::save()?;
}
Ok(())
};
Report::save()?;
}
Ok(())
tokio::runtime::Builder::new_multi_thread()
.worker_threads(args.number_of_threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body)
}
fn init_cli() -> anyhow::Result<Arguments> {
@@ -62,10 +100,6 @@ fn init_cli() -> anyhow::Result<Arguments> {
}
tracing::info!("workdir: {}", args.directory().display());
ThreadPoolBuilder::new()
.num_threads(args.workers)
.build_global()?;
Ok(args)
}
@@ -83,7 +117,11 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<Metad
Ok(corpora)
}
fn run_driver<L, F>(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()>
async fn run_driver<L, F>(
args: &Arguments,
tests: &[MetadataFile],
span: Span,
) -> anyhow::Result<()>
where
L: Platform,
F: Platform,
@@ -93,61 +131,389 @@ where
let leader_nodes = NodePool::<L::Blockchain>::new(args)?;
let follower_nodes = NodePool::<F::Blockchain>::new(args)?;
tests.par_iter().for_each(
|MetadataFile {
content: metadata,
path: metadata_file_path,
}| {
// Starting a new tracing span for this metadata file. This allows our logs to be clear
// about which metadata file the logs belong to. We can add other information into this
// as well to be able to associate the logs with the correct metadata file and case
// that's being executed.
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = metadata_file_path.display().to_string(),
);
let _guard = tracing_span.enter();
let test_cases = tests
.iter()
.flat_map(
|MetadataFile {
path,
content: metadata,
}| {
metadata
.cases
.iter()
.enumerate()
.flat_map(move |(case_idx, case)| {
metadata
.solc_modes()
.into_iter()
.map(move |solc_mode| (path, metadata, case_idx, case, solc_mode))
})
},
)
.collect::<Vec<_>>();
let mut driver = Driver::<L, F>::new(
metadata,
args,
leader_nodes.round_robbin(),
follower_nodes.round_robbin(),
);
let execution_result = driver.execute(span);
tracing::info!(
case_success_count = execution_result.successful_cases_count,
case_failure_count = execution_result.failed_cases_count,
"Execution completed"
);
let mut error_count = 0;
for result in execution_result.results.iter() {
if !result.is_success() {
tracing::error!(execution_error = ?result, "Encountered an error");
error_count += 1;
let compilation_cache = Arc::new(RwLock::new(HashMap::new()));
futures::stream::iter(test_cases)
.for_each_concurrent(
None,
|(metadata_file_path, metadata, case_idx, case, solc_mode)| {
let compilation_cache = compilation_cache.clone();
let leader_node = leader_nodes.round_robbin();
let follower_node = follower_nodes.round_robbin();
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = %metadata_file_path.display(),
case_idx = case_idx,
solc_mode = ?solc_mode,
);
async move {
let result = handle_case_driver::<L, F>(
metadata_file_path.as_path(),
metadata,
case_idx.into(),
case,
solc_mode,
args,
compilation_cache.clone(),
leader_node,
follower_node,
span,
)
.await;
match result {
Ok(inputs_executed) => {
tracing::info!(inputs_executed, "Execution succeeded")
}
Err(error) => tracing::info!(%error, "Execution failed"),
}
tracing::info!("Execution completed");
}
}
if error_count == 0 {
tracing::info!("Execution succeeded");
} else {
tracing::info!("Execution failed");
}
},
);
.instrument(tracing_span)
},
)
.await;
Ok(())
}
fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyhow::Result<()> {
#[allow(clippy::too_many_arguments)]
async fn handle_case_driver<'a, L, F>(
metadata_file_path: &'a Path,
metadata: &'a Metadata,
case_idx: CaseIdx,
case: &Case,
mode: SolcMode,
config: &Arguments,
compilation_cache: CompilationCache<'a>,
leader_node: &L::Blockchain,
follower_node: &F::Blockchain,
_: Span,
) -> anyhow::Result<usize>
where
L: Platform,
F: Platform,
L::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
F::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
{
let leader_pre_link_contracts = get_or_build_contracts::<L>(
metadata,
metadata_file_path,
mode.clone(),
config,
compilation_cache.clone(),
&HashMap::new(),
)
.await?;
let follower_pre_link_contracts = get_or_build_contracts::<F>(
metadata,
metadata_file_path,
mode.clone(),
config,
compilation_cache.clone(),
&HashMap::new(),
)
.await?;
let mut leader_deployed_libraries = HashMap::new();
let mut follower_deployed_libraries = HashMap::new();
let mut contract_sources = metadata.contract_sources()?;
for library_instance in metadata
.libraries
.iter()
.flatten()
.flat_map(|(_, map)| map.values())
{
let ContractPathAndIdent {
contract_source_path: library_source_path,
contract_ident: library_ident,
} = contract_sources
.remove(library_instance)
.context("Failed to find the contract source")?;
let (leader_code, leader_abi) = leader_pre_link_contracts
.1
.contracts
.get(&library_source_path)
.and_then(|contracts| contracts.get(library_ident.as_str()))
.context("Declared library was not compiled")?;
let (follower_code, follower_abi) = follower_pre_link_contracts
.1
.contracts
.get(&library_source_path)
.and_then(|contracts| contracts.get(library_ident.as_str()))
.context("Declared library was not compiled")?;
let leader_code = match alloy::hex::decode(leader_code) {
Ok(code) => code,
Err(error) => {
tracing::error!(
?error,
contract_source_path = library_source_path.display().to_string(),
contract_ident = library_ident.as_ref(),
"Failed to hex-decode byte code - This could possibly mean that the bytecode requires linking"
);
anyhow::bail!("Failed to hex-decode the byte code {}", error)
}
};
let follower_code = match alloy::hex::decode(follower_code) {
Ok(code) => code,
Err(error) => {
tracing::error!(
?error,
contract_source_path = library_source_path.display().to_string(),
contract_ident = library_ident.as_ref(),
"Failed to hex-decode byte code - This could possibly mean that the bytecode requires linking"
);
anyhow::bail!("Failed to hex-decode the byte code {}", error)
}
};
// Getting the deployer address from the cases themselves. This is to ensure that we're
// doing the deployments from different accounts and therefore we're not slowed down by
// the nonce.
let deployer_address = case
.inputs
.iter()
.map(|input| input.caller)
.next()
.unwrap_or(Input::default_caller());
let leader_tx = TransactionBuilder::<Ethereum>::with_deploy_code(
TransactionRequest::default().from(deployer_address),
leader_code,
);
let follower_tx = TransactionBuilder::<Ethereum>::with_deploy_code(
TransactionRequest::default().from(deployer_address),
follower_code,
);
let leader_receipt = match leader_node.execute_transaction(leader_tx).await {
Ok(receipt) => receipt,
Err(error) => {
tracing::error!(
node = std::any::type_name::<L>(),
?error,
"Contract deployment transaction failed."
);
return Err(error);
}
};
let follower_receipt = match follower_node.execute_transaction(follower_tx).await {
Ok(receipt) => receipt,
Err(error) => {
tracing::error!(
node = std::any::type_name::<F>(),
?error,
"Contract deployment transaction failed."
);
return Err(error);
}
};
let Some(leader_library_address) = leader_receipt.contract_address else {
tracing::error!("Contract deployment transaction didn't return an address");
anyhow::bail!("Contract deployment didn't return an address");
};
let Some(follower_library_address) = follower_receipt.contract_address else {
tracing::error!("Contract deployment transaction didn't return an address");
anyhow::bail!("Contract deployment didn't return an address");
};
leader_deployed_libraries.insert(
library_instance.clone(),
(leader_library_address, leader_abi.clone()),
);
follower_deployed_libraries.insert(
library_instance.clone(),
(follower_library_address, follower_abi.clone()),
);
}
let metadata_file_contains_libraries = metadata
.libraries
.iter()
.flat_map(|map| map.iter())
.flat_map(|(_, value)| value.iter())
.next()
.is_some();
let compiled_contracts_require_linking = leader_pre_link_contracts
.1
.contracts
.values()
.chain(follower_pre_link_contracts.1.contracts.values())
.flat_map(|value| value.values())
.any(|(code, _)| !code.chars().all(|char| char.is_ascii_hexdigit()));
let (leader_compiled_contracts, follower_compiled_contracts) =
if metadata_file_contains_libraries && compiled_contracts_require_linking {
let leader_key = (metadata_file_path, mode.clone(), L::config_id());
let follower_key = (metadata_file_path, mode.clone(), L::config_id());
{
let mut cache = compilation_cache.write().await;
cache.remove(&leader_key);
cache.remove(&follower_key);
}
let leader_post_link_contracts = get_or_build_contracts::<L>(
metadata,
metadata_file_path,
mode.clone(),
config,
compilation_cache.clone(),
&leader_deployed_libraries,
)
.await?;
let follower_post_link_contracts = get_or_build_contracts::<F>(
metadata,
metadata_file_path,
mode.clone(),
config,
compilation_cache,
&follower_deployed_libraries,
)
.await?;
(leader_post_link_contracts, follower_post_link_contracts)
} else {
(leader_pre_link_contracts, follower_pre_link_contracts)
};
let leader_state = CaseState::<L>::new(
leader_compiled_contracts.0.clone(),
leader_compiled_contracts.1.contracts.clone(),
leader_deployed_libraries,
);
let follower_state = CaseState::<F>::new(
follower_compiled_contracts.0.clone(),
follower_compiled_contracts.1.contracts.clone(),
follower_deployed_libraries,
);
let mut driver = CaseDriver::<L, F>::new(
metadata,
case,
case_idx,
leader_node,
follower_node,
leader_state,
follower_state,
);
driver.execute().await
}
async fn get_or_build_contracts<'a, P: Platform>(
metadata: &'a Metadata,
metadata_file_path: &'a Path,
mode: SolcMode,
config: &Arguments,
compilation_cache: CompilationCache<'a>,
deployed_libraries: &HashMap<ContractInstance, (Address, JsonAbi)>,
) -> anyhow::Result<Arc<(Version, CompilerOutput)>> {
let key = (metadata_file_path, mode.clone(), P::config_id());
if let Some(compilation_artifact) = compilation_cache.read().await.get(&key).cloned() {
let mut compilation_artifact = compilation_artifact.lock().await;
match *compilation_artifact {
Some(ref compiled_contracts) => {
tracing::debug!(?key, "Compiled contracts cache hit");
return Ok(compiled_contracts.clone());
}
None => {
tracing::debug!(?key, "Compiled contracts cache miss");
let compiled_contracts = Arc::new(
compile_contracts::<P>(metadata, &mode, config, deployed_libraries).await?,
);
*compilation_artifact = Some(compiled_contracts.clone());
return Ok(compiled_contracts.clone());
}
}
};
tracing::debug!(?key, "Compiled contracts cache miss");
let mutex = {
let mut compilation_cache = compilation_cache.write().await;
let mutex = Arc::new(Mutex::new(None));
compilation_cache.insert(key, mutex.clone());
mutex
};
let mut compilation_artifact = mutex.lock().await;
let compiled_contracts =
Arc::new(compile_contracts::<P>(metadata, &mode, config, deployed_libraries).await?);
*compilation_artifact = Some(compiled_contracts.clone());
Ok(compiled_contracts.clone())
}
async fn compile_contracts<P: Platform>(
metadata: &Metadata,
mode: &SolcMode,
config: &Arguments,
deployed_libraries: &HashMap<ContractInstance, (Address, JsonAbi)>,
) -> anyhow::Result<(Version, CompilerOutput)> {
let compiler_version_or_requirement = mode.compiler_version_to_use(config.solc.clone());
let compiler_path =
P::Compiler::get_compiler_executable(config, compiler_version_or_requirement).await?;
let compiler_version = P::Compiler::new(compiler_path.clone()).version()?;
let compiler = Compiler::<P::Compiler>::new()
.with_allow_path(metadata.directory()?)
.with_optimization(mode.solc_optimize());
let mut compiler = metadata
.files_to_compile()?
.try_fold(compiler, |compiler, path| compiler.with_source(&path))?;
for (library_instance, (library_address, _)) in deployed_libraries.iter() {
let library_ident = &metadata
.contracts
.as_ref()
.and_then(|contracts| contracts.get(library_instance))
.expect("Impossible for library to not be found in contracts")
.contract_ident;
// Note the following: we need to tell solc which files require the libraries to be
// linked into them. We do not have access to this information and therefore we choose
// an easier, yet more compute intensive route, of telling solc that all of the files
// need to link the library and it will only perform the linking for the files that do
// actually need the library.
compiler = FilesWithExtensionIterator::new(metadata.directory()?)
.with_allowed_extension("sol")
.fold(compiler, |compiler, path| {
compiler.with_library(&path, library_ident.as_str(), *library_address)
});
}
let compiler_output = compiler.try_build(compiler_path).await?;
Ok((compiler_version, compiler_output))
}
async fn execute_corpus(
args: &Arguments,
tests: &[MetadataFile],
span: Span,
) -> anyhow::Result<()> {
match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
run_driver::<Geth, Kitchensink>(args, tests, span)?
run_driver::<Geth, Kitchensink>(args, tests, span).await?
}
(TestingPlatform::Geth, TestingPlatform::Geth) => {
run_driver::<Geth, Geth>(args, tests, span)?
run_driver::<Geth, Geth>(args, tests, span).await?
}
_ => unimplemented!(),
}
@@ -155,24 +521,41 @@ fn execute_corpus(args: &Arguments, tests: &[MetadataFile], span: Span) -> anyho
Ok(())
}
fn compile_corpus(
async fn compile_corpus(
config: &Arguments,
tests: &[MetadataFile],
platform: &TestingPlatform,
span: Span,
_: Span,
) {
tests.par_iter().for_each(|metadata| {
for mode in &metadata.solc_modes() {
let tests = tests.iter().flat_map(|metadata| {
metadata
.solc_modes()
.into_iter()
.map(move |solc_mode| (metadata, solc_mode))
});
futures::stream::iter(tests)
.for_each_concurrent(None, |(metadata, mode)| async move {
match platform {
TestingPlatform::Geth => {
let mut state = State::<Geth>::new(config, span);
let _ = state.build_contracts(mode, metadata);
let _ = compile_contracts::<Geth>(
&metadata.content,
&mode,
config,
&Default::default(),
)
.await;
}
TestingPlatform::Kitchensink => {
let mut state = State::<Kitchensink>::new(config, span);
let _ = state.build_contracts(mode, metadata);
let _ = compile_contracts::<Geth>(
&metadata.content,
&mode,
config,
&Default::default(),
)
.await;
}
};
}
});
}
})
.await;
}