Back to parallelise individual cases, report individual cases, address grumbles

This commit is contained in:
James Wilson
2025-08-13 13:35:11 +01:00
parent 45040016c2
commit a08cace3c4
2 changed files with 142 additions and 126 deletions
+15 -3
View File
@@ -96,9 +96,14 @@ pub struct Arguments {
#[arg(long, default_value = "1")]
pub number_of_nodes: usize,
/// Determines the amount of tokio worker threads that will will be used. Defaults to the number of CPU cores.
#[arg(long)]
pub number_of_threads: Option<usize>,
/// Determines the amount of tokio worker threads that will will be used.
#[arg(
long,
default_value_t = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
)]
pub number_of_threads: usize,
/// Determines the amount of concurrent tasks that will be spawned to run tests. Defaults to 10 x the number of nodes.
#[arg(long)]
@@ -138,6 +143,13 @@ impl Arguments {
panic!("should have a workdir configured")
}
/// Return the number of concurrent tasks to run. This is provided via the
/// `--number-concurrent-tasks` argument, and otherwise defaults to --number-of-nodes * 20.
pub fn number_of_concurrent_tasks(&self) -> usize {
self.number_concurrent_tasks
.unwrap_or(20 * self.number_of_nodes)
}
/// Try to parse `self.account` into a [PrivateKeySigner],
/// panicing on error.
pub fn wallet(&self) -> EthereumWallet {
+127 -123
View File
@@ -56,11 +56,12 @@ struct Test {
metadata: Metadata,
path: PathBuf,
mode: SolcMode,
cases: Vec<Case>,
case_idx: usize,
case: Case,
}
/// This represents the results that we gather from running test cases.
type CaseResults = Vec<Result<usize, anyhow::Error>>;
type CaseResult = Result<usize, anyhow::Error>;
fn main() -> anyhow::Result<()> {
let args = init_cli()?;
@@ -77,14 +78,8 @@ fn main() -> anyhow::Result<()> {
Ok(())
};
let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
if let Some(num_threads) = args.number_of_threads {
// Allow it to default to number of CPU cores if not specified.
runtime_builder.worker_threads(num_threads);
}
runtime_builder
tokio::runtime::Builder::new_multi_thread()
.worker_threads(args.number_of_threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
@@ -147,7 +142,7 @@ where
L::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
F::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
{
let (report_tx, report_rx) = mpsc::unbounded_channel::<(Test, CaseResults)>();
let (report_tx, report_rx) = mpsc::unbounded_channel::<(Test, CaseResult)>();
let tests = prepare_tests::<L, F>(metadata_files);
let driver_task = start_driver_task::<L, F>(args, tests, span, report_tx)?;
@@ -167,7 +162,50 @@ where
{
metadata_files
.iter()
.filter(|file| match file.content.required_evm_version {
.flat_map(
|MetadataFile {
path,
content: metadata,
}| {
metadata
.cases
.iter()
.enumerate()
.flat_map(move |(case_idx, case)| {
metadata
.solc_modes()
.into_iter()
.map(move |solc_mode| (path, metadata, case_idx, case, solc_mode))
})
},
)
.filter(
|(metadata_file_path, metadata, _, _, _)| match metadata.ignore {
Some(true) => {
tracing::warn!(
metadata_file_path = %metadata_file_path.display(),
"Ignoring metadata file"
);
false
}
Some(false) | None => true,
},
)
.filter(
|(metadata_file_path, _, case_idx, case, _)| match case.ignore {
Some(true) => {
tracing::warn!(
metadata_file_path = %metadata_file_path.display(),
case_idx,
case_name = ?case.name,
"Ignoring case"
);
false
}
Some(false) | None => true,
},
)
.filter(|(metadata_file_path, metadata, ..)| match metadata.required_evm_version {
Some(evm_version_requirement) => {
let is_allowed = evm_version_requirement
.matches(&<L::Blockchain as revive_dt_node::Node>::evm_version())
@@ -176,7 +214,7 @@ where
if !is_allowed {
tracing::warn!(
metadata_file_path = %file.path.display(),
metadata_file_path = %metadata_file_path.display(),
leader_evm_version = %<L::Blockchain as revive_dt_node::Node>::evm_version(),
follower_evm_version = %<F::Blockchain as revive_dt_node::Node>::evm_version(),
version_requirement = %evm_version_requirement,
@@ -188,32 +226,14 @@ where
}
None => true,
})
.flat_map(|MetadataFile { path, content: metadata }| {
let cases = metadata
.cases
.iter()
.enumerate()
.filter_map(|(case_idx, case)| {
if case.ignore.unwrap_or_default() {
tracing::warn!(
metadata_file_path = %path.display(),
case_idx,
case_name = ?case.name,
"Ignoring case"
);
None
} else {
Some(case.clone())
}
})
.collect::<Vec<_>>();
metadata.solc_modes().into_iter().map(move |solc_mode| Test {
.map(|(metadata_file_path, metadata, case_idx, case, solc_mode)| {
Test {
metadata: metadata.clone(),
path: path.to_path_buf(),
path: metadata_file_path.to_path_buf(),
mode: solc_mode,
cases: cases.clone(),
})
case_idx,
case: case.clone(),
}
})
}
@@ -221,7 +241,7 @@ fn start_driver_task<L, F>(
args: &Arguments,
tests: impl Iterator<Item = Test>,
span: Span,
report_tx: mpsc::UnboundedSender<(Test, CaseResults)>,
report_tx: mpsc::UnboundedSender<(Test, CaseResult)>,
) -> anyhow::Result<impl Future<Output = ()>>
where
L: Platform,
@@ -232,9 +252,7 @@ where
let leader_nodes = Arc::new(NodePool::<L::Blockchain>::new(args)?);
let follower_nodes = Arc::new(NodePool::<F::Blockchain>::new(args)?);
let compilation_cache = Arc::new(RwLock::new(HashMap::new()));
let number_concurrent_tasks = args
.number_concurrent_tasks
.unwrap_or(args.number_of_nodes * 10);
let number_concurrent_tasks = args.number_of_concurrent_tasks();
Ok(futures::stream::iter(tests).for_each_concurrent(
// We want to limit the concurrent tasks here because:
@@ -255,110 +273,98 @@ where
let leader_node = leader_nodes.round_robbin();
let follower_node = follower_nodes.round_robbin();
let mut case_results = vec![];
for (case_idx, case) in test.cases.iter().enumerate() {
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = %test.path.display(),
case_idx = case_idx,
solc_mode = ?test.mode,
);
let tracing_span = tracing::span!(
Level::INFO,
"Running driver",
metadata_file_path = %test.path.display(),
case_idx = ?test.case_idx,
solc_mode = ?test.mode,
);
let result = handle_case_driver::<L, F>(
&test.path,
&test.metadata,
case_idx.into(),
case,
test.mode.clone(),
args,
compilation_cache.clone(),
leader_node,
follower_node,
span,
)
.instrument(tracing_span)
.await;
case_results.push(result);
}
let result = handle_case_driver::<L, F>(
&test.path,
&test.metadata,
test.case_idx.into(),
&test.case,
test.mode.clone(),
args,
compilation_cache.clone(),
leader_node,
follower_node,
span,
)
.instrument(tracing_span)
.await;
report_tx
.send((test, case_results))
.send((test, result))
.expect("Failed to send report");
}
},
))
}
async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test, CaseResults)>) {
async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test, CaseResult)>) {
let start = Instant::now();
const GREEN: &str = "\x1B[32m";
const RED: &str = "\x1B[31m";
const RESET: &str = "\x1B[0m";
const COLOUR_RESET: &str = "\x1B[0m";
const BOLD: &str = "\x1B[1m";
const BOLD_RESET: &str = "\x1B[22m";
let mut number_of_successes = 0;
let mut number_of_failures = 0;
let mut failures = vec![];
// Wait for reports to come from our test runner. When the channel closes, this ends.
while let Some((test, case_results)) = report_rx.recv().await {
let tracing_span = tracing::span!(
Level::INFO,
"Test completed",
metadata_file_path = %test.path.display(),
solc_mode = ?test.mode,
);
let _ = tracing_span.enter();
while let Some((test, case_result)) = report_rx.recv().await {
let case_name = test.case.name.as_deref().unwrap_or("unnamed_case");
let case_idx = test.case_idx;
let test_path = test.path.display();
let test_mode = test.mode.clone();
let contains_failures = case_results.iter().any(|res| res.is_err());
if !contains_failures {
eprintln!(
"{}Succeeded:{} {} - {:?}",
GREEN,
RESET,
test.path.display(),
test.mode
)
} else {
eprintln!(
"{}Failed:{} {} - {:?}",
RED,
RESET,
test.path.display(),
test.mode
)
};
for (case_idx, (case, result)) in test.cases.iter().zip(case_results).enumerate() {
let case_name = case.name.as_deref().unwrap_or("Unnamed case");
match result {
Ok(_inputs_executed) => {
number_of_successes += 1;
eprintln!(" {GREEN}Case Succeeded:{RESET} {case_name} - Case Idx: {case_idx}",);
}
Err(err) => {
number_of_failures += 1;
eprintln!(" {RED}Case Failed:{RESET} {case_name} - Case Idx: {case_idx}",);
tracing::info!(
case_idx,
case_name,
%err,
"Case failed with error"
)
}
match case_result {
Ok(_inputs) => {
number_of_successes += 1;
eprintln!(
"{GREEN}Case Succeeded:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?})"
);
}
Err(err) => {
number_of_failures += 1;
eprintln!(
"{RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?})"
);
failures.push((test, err));
}
}
eprintln!();
}
let elapsed = start.elapsed();
// Now, log the failures with more complete errors at the bottom, like `cargo test` does, so
// that we don't have to scroll through the entire output to find them.
if !failures.is_empty() {
eprintln!("{BOLD}Failures:{BOLD_RESET}\n");
for failure in failures {
let (test, err) = failure;
let case_name = test.case.name.as_deref().unwrap_or("unnamed_case");
let case_idx = test.case_idx;
let test_path = test.path.display();
let test_mode = test.mode.clone();
eprintln!(
"---- {RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?}) ----\n\n{err}\n"
);
}
}
// Summary at the end.
eprintln!(
"{GREEN}{}{RESET} cases succeeded, {RED}{}{RESET} cases failed in {} seconds",
number_of_successes,
number_of_failures,
"{} cases: {GREEN}{number_of_successes}{COLOUR_RESET} cases succeeded, {RED}{number_of_failures}{COLOUR_RESET} cases failed in {} seconds",
number_of_successes + number_of_failures,
elapsed.as_secs()
);
}
@@ -512,11 +518,9 @@ where
);
let Some(leader_library_address) = leader_receipt.contract_address else {
tracing::error!("Contract deployment transaction didn't return an address");
anyhow::bail!("Contract deployment didn't return an address");
};
let Some(follower_library_address) = follower_receipt.contract_address else {
tracing::error!("Contract deployment transaction didn't return an address");
anyhow::bail!("Contract deployment didn't return an address");
};
@@ -554,7 +558,7 @@ where
let follower_key = (
metadata_file_path.to_path_buf(),
mode.clone(),
L::config_id(),
F::config_id(),
);
{
let mut cache = compilation_cache.write().await;