Refactor the Global Configuration & Context (#157)

* Cleanup the config

* Update usage guides

* Update the run script

* Fix tests

* Use kitchensink in tests

* Use shared node more often in tests
This commit is contained in:
Omar
2025-09-04 17:25:05 +03:00
committed by GitHub
parent 7878f68c26
commit c2526e48e7
23 changed files with 805 additions and 441 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ use alloy::{
primitives::Address,
rpc::types::{TransactionRequest, trace::geth::DiffMode},
};
use anyhow::Context;
use anyhow::Context as _;
use futures::TryStreamExt;
use indexmap::IndexMap;
use revive_dt_format::traits::{ResolutionContext, ResolverApi};
+63 -98
View File
@@ -5,7 +5,7 @@ use std::{
collections::{BTreeMap, HashMap},
io::{BufWriter, Write, stderr},
path::Path,
sync::{Arc, LazyLock},
sync::Arc,
time::Instant,
};
@@ -13,7 +13,7 @@ use alloy::{
network::{Ethereum, TransactionBuilder},
rpc::types::TransactionRequest,
};
use anyhow::Context;
use anyhow::Context as _;
use clap::Parser;
use futures::stream;
use futures::{Stream, StreamExt};
@@ -24,15 +24,13 @@ use revive_dt_report::{
TestSpecificReporter, TestSpecifier,
};
use serde_json::{Value, json};
use temp_dir::TempDir;
use tokio::try_join;
use tracing::{debug, error, info, info_span, instrument};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use revive_dt_common::{iterators::EitherIter, types::Mode};
use revive_dt_compiler::{CompilerOutput, SolidityCompiler};
use revive_dt_config::*;
use revive_dt_config::{Context, *};
use revive_dt_core::{
Geth, Kitchensink, Platform,
driver::{CaseDriver, CaseState},
@@ -48,58 +46,8 @@ use revive_dt_node::{Node, pool::NodePool};
use crate::cached_compiler::CachedCompiler;
static TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
fn main() -> anyhow::Result<()> {
let (args, _guard) = init_cli().context("Failed to initialize CLI and tracing subscriber")?;
info!(
leader = args.leader.to_string(),
follower = args.follower.to_string(),
working_directory = %args.directory().display(),
number_of_nodes = args.number_of_nodes,
invalidate_compilation_cache = args.invalidate_compilation_cache,
"Differential testing tool has been initialized"
);
let (reporter, report_aggregator_task) = ReportAggregator::new(args.clone()).into_task();
let number_of_threads = args.number_of_threads;
let body = async move {
let tests = collect_corpora(&args)
.context("Failed to collect corpus files from provided arguments")?
.into_iter()
.inspect(|(corpus, _)| {
reporter
.report_corpus_file_discovery_event(corpus.clone())
.expect("Can't fail")
})
.flat_map(|(_, files)| files.into_iter())
.inspect(|metadata_file| {
reporter
.report_metadata_file_discovery_event(
metadata_file.metadata_file_path.clone(),
metadata_file.content.clone(),
)
.expect("Can't fail")
})
.collect::<Vec<_>>();
execute_corpus(&args, &tests, reporter, report_aggregator_task)
.await
.context("Failed to execute corpus")?;
Ok(())
};
tokio::runtime::Builder::new_multi_thread()
.worker_threads(number_of_threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body)
}
fn init_cli() -> anyhow::Result<(Arguments, WorkerGuard)> {
let (writer, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
let (writer, _guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
.lossy(false)
// Assuming that each line contains 255 characters and that each character is one byte, then
// this means that our buffer is about 4GBs large.
@@ -118,31 +66,51 @@ fn init_cli() -> anyhow::Result<(Arguments, WorkerGuard)> {
tracing::subscriber::set_global_default(subscriber)?;
info!("Differential testing tool is starting");
let mut args = Arguments::parse();
let context = Context::try_parse()?;
let (reporter, report_aggregator_task) = ReportAggregator::new(context.clone()).into_task();
if args.corpus.is_empty() {
anyhow::bail!("no test corpus specified");
}
match context {
Context::ExecuteTests(context) => {
let tests = collect_corpora(&context)
.context("Failed to collect corpus files from provided arguments")?
.into_iter()
.inspect(|(corpus, _)| {
reporter
.report_corpus_file_discovery_event(corpus.clone())
.expect("Can't fail")
})
.flat_map(|(_, files)| files.into_iter())
.inspect(|metadata_file| {
reporter
.report_metadata_file_discovery_event(
metadata_file.metadata_file_path.clone(),
metadata_file.content.clone(),
)
.expect("Can't fail")
})
.collect::<Vec<_>>();
match args.working_directory.as_ref() {
Some(dir) => {
if !dir.exists() {
anyhow::bail!("workdir {} does not exist", dir.display());
}
}
None => {
args.temp_dir = Some(&TEMP_DIR);
tokio::runtime::Builder::new_multi_thread()
.worker_threads(context.concurrency_configuration.number_of_threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(async move {
execute_corpus(context, &tests, reporter, report_aggregator_task)
.await
.context("Failed to execute corpus")
})
}
}
Ok((args, guard))
}
#[instrument(level = "debug", name = "Collecting Corpora", skip_all)]
fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<MetadataFile>>> {
fn collect_corpora(
context: &ExecutionContext,
) -> anyhow::Result<HashMap<Corpus, Vec<MetadataFile>>> {
let mut corpora = HashMap::new();
for path in &args.corpus {
for path in &context.corpus {
let span = info_span!("Processing corpus file", path = %path.display());
let _guard = span.enter();
@@ -160,7 +128,7 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<Metad
}
async fn run_driver<L, F>(
args: &Arguments,
context: ExecutionContext,
metadata_files: &[MetadataFile],
reporter: Reporter,
report_aggregator_task: impl Future<Output = anyhow::Result<()>>,
@@ -171,20 +139,20 @@ where
L::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
F::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
{
let leader_nodes =
NodePool::<L::Blockchain>::new(args).context("Failed to initialize leader node pool")?;
let follower_nodes =
NodePool::<F::Blockchain>::new(args).context("Failed to initialize follower node pool")?;
let leader_nodes = NodePool::<L::Blockchain>::new(context.clone())
.context("Failed to initialize leader node pool")?;
let follower_nodes = NodePool::<F::Blockchain>::new(context.clone())
.context("Failed to initialize follower node pool")?;
let tests_stream = tests_stream(
args,
&context,
metadata_files.iter(),
&leader_nodes,
&follower_nodes,
reporter.clone(),
)
.await;
let driver_task = start_driver_task::<L, F>(args, tests_stream)
let driver_task = start_driver_task::<L, F>(&context, tests_stream)
.await
.context("Failed to start driver task")?;
let cli_reporting_task = start_cli_reporting_task(reporter);
@@ -196,7 +164,7 @@ where
}
async fn tests_stream<'a, L, F>(
args: &Arguments,
args: &ExecutionContext,
metadata_files: impl IntoIterator<Item = &'a MetadataFile> + Clone,
leader_node_pool: &'a NodePool<L::Blockchain>,
follower_node_pool: &'a NodePool<F::Blockchain>,
@@ -320,7 +288,7 @@ where
}
async fn start_driver_task<'a, L, F>(
args: &Arguments,
context: &ExecutionContext,
tests: impl Stream<Item = Test<'a, L, F>>,
) -> anyhow::Result<impl Future<Output = ()>>
where
@@ -333,25 +301,22 @@ where
{
info!("Starting driver task");
let number_concurrent_tasks = args.number_of_concurrent_tasks();
let cached_compiler = Arc::new(
CachedCompiler::new(
args.directory().join("compilation_cache"),
args.invalidate_compilation_cache,
context
.working_directory
.as_path()
.join("compilation_cache"),
context
.compilation_configuration
.invalidate_compilation_cache,
)
.await
.context("Failed to initialize cached compiler")?,
);
Ok(tests.for_each_concurrent(
// We want to limit the concurrent tasks here because:
//
// 1. We don't want to overwhelm the nodes with too many requests, leading to responses timing out.
// 2. We don't want to open too many files at once, leading to the OS running out of file descriptors.
//
// By default, we allow maximum of 10 ongoing requests per node in order to limit (1), and assume that
// this number will automatically be low enough to address (2). The user can override this.
Some(number_concurrent_tasks),
context.concurrency_configuration.concurrency_limit(),
move |test| {
let cached_compiler = cached_compiler.clone();
@@ -387,8 +352,7 @@ where
))
}
#[allow(clippy::uninlined_format_args)]
#[allow(irrefutable_let_patterns)]
#[allow(irrefutable_let_patterns, clippy::uninlined_format_args)]
async fn start_cli_reporting_task(reporter: Reporter) {
let mut aggregator_events_rx = reporter.subscribe().await.expect("Can't fail");
drop(reporter);
@@ -710,17 +674,18 @@ where
}
async fn execute_corpus(
args: &Arguments,
context: ExecutionContext,
tests: &[MetadataFile],
reporter: Reporter,
report_aggregator_task: impl Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()> {
match (&args.leader, &args.follower) {
match (&context.leader, &context.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
run_driver::<Geth, Kitchensink>(args, tests, reporter, report_aggregator_task).await?
run_driver::<Geth, Kitchensink>(context, tests, reporter, report_aggregator_task)
.await?
}
(TestingPlatform::Geth, TestingPlatform::Geth) => {
run_driver::<Geth, Geth>(args, tests, reporter, report_aggregator_task).await?
run_driver::<Geth, Geth>(context, tests, reporter, report_aggregator_task).await?
}
_ => unimplemented!(),
}