From 46aea0890d55088bfbdcaecf467ff2b70d1898f8 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 13 Aug 2025 14:10:26 +0100 Subject: [PATCH] Split reporter and case runner, use channels to pass test reports (#137) * Use channels to send data to reporting thread and avoid hangs / mutex / duration. Limit max concurrent tasks to avoid too many open files * More appropriate name for dirver/reporter task fns * Back to parallelise individual cases, report individual cases, address grumbles * newline before 'Failures' title in report --- crates/config/src/lib.rs | 20 ++- crates/core/src/main.rs | 361 ++++++++++++++++++++------------------- 2 files changed, 203 insertions(+), 178 deletions(-) diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 67ed625..b7871fb 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -96,10 +96,19 @@ pub struct Arguments { #[arg(long, default_value = "1")] pub number_of_nodes: usize, - /// Determines the amount of threads that will will be used. - #[arg(long, default_value = "12")] + /// Determines the amount of tokio worker threads that will will be used. + #[arg( + long, + default_value_t = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + )] pub number_of_threads: usize, + /// Determines the amount of concurrent tasks that will be spawned to run tests. Defaults to 10 x the number of nodes. + #[arg(long)] + pub number_concurrent_tasks: Option, + /// Extract problems back to the test corpus. #[arg(short, long = "extract-problems")] pub extract_problems: bool, @@ -134,6 +143,13 @@ impl Arguments { panic!("should have a workdir configured") } + /// Return the number of concurrent tasks to run. This is provided via the + /// `--number-concurrent-tasks` argument, and otherwise defaults to --number-of-nodes * 20. + pub fn number_of_concurrent_tasks(&self) -> usize { + self.number_concurrent_tasks + .unwrap_or(20 * self.number_of_nodes) + } + /// Try to parse `self.account` into a [PrivateKeySigner], /// panicing on error. pub fn wallet(&self) -> EthereumWallet { diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 81a688f..55ec7d3 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - path::Path, + path::{Path, PathBuf}, sync::{Arc, LazyLock}, time::Instant, }; @@ -18,7 +18,7 @@ use revive_dt_common::iterators::FilesWithExtensionIterator; use revive_dt_node_interaction::EthereumNode; use semver::Version; use temp_dir::TempDir; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, mpsc}; use tracing::{Instrument, Level}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; @@ -41,15 +41,28 @@ use revive_dt_report::reporter::{Report, Span}; static TEMP_DIR: LazyLock = LazyLock::new(|| TempDir::new().unwrap()); -type CompilationCache<'a> = Arc< +type CompilationCache = Arc< RwLock< HashMap< - (&'a Path, SolcMode, TestingPlatform), + (PathBuf, SolcMode, TestingPlatform), Arc>>>, >, >, >; +/// this represents a single "test"; a mode, path and collection of cases. +#[derive(Clone)] +struct Test { + metadata: Metadata, + path: PathBuf, + mode: SolcMode, + case_idx: usize, + case: Case, +} + +/// This represents the results that we gather from running test cases. +type CaseResult = Result; + fn main() -> anyhow::Result<()> { let args = init_cli()?; @@ -120,7 +133,7 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result( args: &Arguments, - tests: &[MetadataFile], + metadata_files: &[MetadataFile], span: Span, ) -> anyhow::Result<()> where @@ -129,10 +142,25 @@ where L::Blockchain: revive_dt_node::Node + Send + Sync + 'static, F::Blockchain: revive_dt_node::Node + Send + Sync + 'static, { - let leader_nodes = NodePool::::new(args)?; - let follower_nodes = NodePool::::new(args)?; + let (report_tx, report_rx) = mpsc::unbounded_channel::<(Test, CaseResult)>(); - let test_cases = tests + let tests = prepare_tests::(metadata_files); + let driver_task = start_driver_task::(args, tests, span, report_tx)?; + let status_reporter_task = start_reporter_task(report_rx); + + tokio::join!(status_reporter_task, driver_task); + + Ok(()) +} + +fn prepare_tests(metadata_files: &[MetadataFile]) -> impl Iterator +where + L: Platform, + F: Platform, + L::Blockchain: revive_dt_node::Node + Send + Sync + 'static, + F::Blockchain: revive_dt_node::Node + Send + Sync + 'static, +{ + metadata_files .iter() .flat_map( |MetadataFile { @@ -198,188 +226,159 @@ where } None => true, }) - .collect::>(); - - let metadata_case_status = Arc::new(RwLock::new(test_cases.iter().fold( - HashMap::<_, HashMap<_, _>>::new(), - |mut map, (path, _, case_idx, case, solc_mode)| { - map.entry((path.to_path_buf(), solc_mode.clone())) - .or_default() - .insert((CaseIdx::new(*case_idx), case.name.clone()), None::); - map - }, - ))); - let status_reporter_task = { - let metadata_case_status = metadata_case_status.clone(); - let start = Instant::now(); - async move { - const GREEN: &str = "\x1B[32m"; - const RED: &str = "\x1B[31m"; - const RESET: &str = "\x1B[0m"; - - let mut entries_to_delete = Vec::new(); - let mut number_of_successes = 0; - let mut number_of_failures = 0; - loop { - let metadata_case_status_read = metadata_case_status.read().await; - if metadata_case_status_read.is_empty() { - break; - } - - for ((metadata_file_path, solc_mode), case_status) in - metadata_case_status_read.iter() - { - if case_status.values().any(|value| value.is_none()) { - continue; - } - - let contains_failures = case_status - .values() - .any(|value| value.is_some_and(|value| !value)); - - if !contains_failures { - eprintln!( - "{}Succeeded:{} {} - {:?}", - GREEN, - RESET, - metadata_file_path.display(), - solc_mode - ) - } else { - eprintln!( - "{}Failed:{} {} - {:?}", - RED, - RESET, - metadata_file_path.display(), - solc_mode - ) - }; - - number_of_successes += case_status - .values() - .filter(|value| value.is_some_and(|value| value)) - .count(); - number_of_failures += case_status - .values() - .filter(|value| value.is_some_and(|value| !value)) - .count(); - - let mut case_status = case_status - .iter() - .map(|((case_idx, case_name), case_status)| { - (case_idx.into_inner(), case_name, case_status.unwrap()) - }) - .collect::>(); - case_status.sort_by(|a, b| a.0.cmp(&b.0)); - for (case_idx, case_name, case_status) in case_status.into_iter() { - if case_status { - eprintln!( - " {GREEN}Case Succeeded:{RESET} {} - Case Idx: {case_idx}", - case_name - .as_ref() - .map(|string| string.as_str()) - .unwrap_or("Unnamed case") - ) - } else { - eprintln!( - " {RED}Case Failed:{RESET} {} - Case Idx: {case_idx}", - case_name - .as_ref() - .map(|string| string.as_str()) - .unwrap_or("Unnamed case") - ) - }; - } - eprintln!(); - - entries_to_delete.push((metadata_file_path.clone(), solc_mode.clone())); - } - - drop(metadata_case_status_read); - let mut metadata_case_status_write = metadata_case_status.write().await; - for entry in entries_to_delete.drain(..) { - metadata_case_status_write.remove(&entry); - } - - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + .map(|(metadata_file_path, metadata, case_idx, case, solc_mode)| { + Test { + metadata: metadata.clone(), + path: metadata_file_path.to_path_buf(), + mode: solc_mode, + case_idx, + case: case.clone(), } + }) +} - let elapsed = start.elapsed(); - eprintln!( - "{GREEN}{}{RESET} cases succeeded, {RED}{}{RESET} cases failed in {} seconds", - number_of_successes, - number_of_failures, - elapsed.as_secs() - ); - } - }; - +fn start_driver_task( + args: &Arguments, + tests: impl Iterator, + span: Span, + report_tx: mpsc::UnboundedSender<(Test, CaseResult)>, +) -> anyhow::Result> +where + L: Platform, + F: Platform, + L::Blockchain: revive_dt_node::Node + Send + Sync + 'static, + F::Blockchain: revive_dt_node::Node + Send + Sync + 'static, +{ + let leader_nodes = Arc::new(NodePool::::new(args)?); + let follower_nodes = Arc::new(NodePool::::new(args)?); let compilation_cache = Arc::new(RwLock::new(HashMap::new())); - let driver_task = futures::stream::iter(test_cases).for_each_concurrent( - None, - |(metadata_file_path, metadata, case_idx, case, solc_mode)| { + let number_concurrent_tasks = args.number_of_concurrent_tasks(); + + Ok(futures::stream::iter(tests).for_each_concurrent( + // We want to limit the concurrent tasks here because: + // + // 1. We don't want to overwhelm the nodes with too many requests, leading to responses timing out. + // 2. We don't want to open too many files at once, leading to the OS running out of file descriptors. + // + // By default, we allow maximum of 10 ongoing requests per node in order to limit (1), and assume that + // this number will automatically be low enough to address (2). The user can override this. + Some(number_concurrent_tasks), + move |test| { + let leader_nodes = leader_nodes.clone(); + let follower_nodes = follower_nodes.clone(); let compilation_cache = compilation_cache.clone(); - let leader_node = leader_nodes.round_robbin(); - let follower_node = follower_nodes.round_robbin(); - let tracing_span = tracing::span!( - Level::INFO, - "Running driver", - metadata_file_path = %metadata_file_path.display(), - case_idx = case_idx, - solc_mode = ?solc_mode, - ); - let metadata_case_status = metadata_case_status.clone(); + let report_tx = report_tx.clone(); + async move { + let leader_node = leader_nodes.round_robbin(); + let follower_node = follower_nodes.round_robbin(); + + let tracing_span = tracing::span!( + Level::INFO, + "Running driver", + metadata_file_path = %test.path.display(), + case_idx = ?test.case_idx, + solc_mode = ?test.mode, + ); + let result = handle_case_driver::( - metadata_file_path.as_path(), - metadata, - case_idx.into(), - case, - solc_mode.clone(), + &test.path, + &test.metadata, + test.case_idx.into(), + &test.case, + test.mode.clone(), args, compilation_cache.clone(), leader_node, follower_node, span, ) + .instrument(tracing_span) .await; - let mut metadata_case_status = metadata_case_status.write().await; - match result { - Ok(inputs_executed) => { - tracing::info!(inputs_executed, "Execution succeeded"); - metadata_case_status - .entry((metadata_file_path.clone(), solc_mode)) - .or_default() - .insert((CaseIdx::new(case_idx), case.name.clone()), Some(true)); - } - Err(error) => { - metadata_case_status - .entry((metadata_file_path.clone(), solc_mode)) - .or_default() - .insert((CaseIdx::new(case_idx), case.name.clone()), Some(false)); - tracing::error!(%error, "Execution failed") - } - } - tracing::info!("Execution completed"); + + report_tx + .send((test, result)) + .expect("Failed to send report"); } - .instrument(tracing_span) }, + )) +} + +async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test, CaseResult)>) { + let start = Instant::now(); + + const GREEN: &str = "\x1B[32m"; + const RED: &str = "\x1B[31m"; + const COLOUR_RESET: &str = "\x1B[0m"; + const BOLD: &str = "\x1B[1m"; + const BOLD_RESET: &str = "\x1B[22m"; + + let mut number_of_successes = 0; + let mut number_of_failures = 0; + let mut failures = vec![]; + + // Wait for reports to come from our test runner. When the channel closes, this ends. + while let Some((test, case_result)) = report_rx.recv().await { + let case_name = test.case.name.as_deref().unwrap_or("unnamed_case"); + let case_idx = test.case_idx; + let test_path = test.path.display(); + let test_mode = test.mode.clone(); + + match case_result { + Ok(_inputs) => { + number_of_successes += 1; + eprintln!( + "{GREEN}Case Succeeded:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?})" + ); + } + Err(err) => { + number_of_failures += 1; + eprintln!( + "{RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?})" + ); + failures.push((test, err)); + } + } + } + + eprintln!(); + let elapsed = start.elapsed(); + + // Now, log the failures with more complete errors at the bottom, like `cargo test` does, so + // that we don't have to scroll through the entire output to find them. + if !failures.is_empty() { + eprintln!("{BOLD}Failures:{BOLD_RESET}\n"); + + for failure in failures { + let (test, err) = failure; + let case_name = test.case.name.as_deref().unwrap_or("unnamed_case"); + let case_idx = test.case_idx; + let test_path = test.path.display(); + let test_mode = test.mode.clone(); + + eprintln!( + "---- {RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode:?}) ----\n\n{err}\n" + ); + } + } + + // Summary at the end. + eprintln!( + "{} cases: {GREEN}{number_of_successes}{COLOUR_RESET} cases succeeded, {RED}{number_of_failures}{COLOUR_RESET} cases failed in {} seconds", + number_of_successes + number_of_failures, + elapsed.as_secs() ); - - tokio::join!(status_reporter_task, driver_task); - - Ok(()) } #[allow(clippy::too_many_arguments)] -async fn handle_case_driver<'a, L, F>( - metadata_file_path: &'a Path, - metadata: &'a Metadata, +async fn handle_case_driver( + metadata_file_path: &Path, + metadata: &Metadata, case_idx: CaseIdx, case: &Case, mode: SolcMode, config: &Arguments, - compilation_cache: CompilationCache<'a>, + compilation_cache: CompilationCache, leader_node: &L::Blockchain, follower_node: &F::Blockchain, _: Span, @@ -520,11 +519,9 @@ where ); let Some(leader_library_address) = leader_receipt.contract_address else { - tracing::error!("Contract deployment transaction didn't return an address"); anyhow::bail!("Contract deployment didn't return an address"); }; let Some(follower_library_address) = follower_receipt.contract_address else { - tracing::error!("Contract deployment transaction didn't return an address"); anyhow::bail!("Contract deployment didn't return an address"); }; @@ -554,8 +551,16 @@ where .any(|(code, _)| !code.chars().all(|char| char.is_ascii_hexdigit())); let (leader_compiled_contracts, follower_compiled_contracts) = if metadata_file_contains_libraries && compiled_contracts_require_linking { - let leader_key = (metadata_file_path, mode.clone(), L::config_id()); - let follower_key = (metadata_file_path, mode.clone(), L::config_id()); + let leader_key = ( + metadata_file_path.to_path_buf(), + mode.clone(), + L::config_id(), + ); + let follower_key = ( + metadata_file_path.to_path_buf(), + mode.clone(), + F::config_id(), + ); { let mut cache = compilation_cache.write().await; cache.remove(&leader_key); @@ -609,15 +614,19 @@ where driver.execute().await } -async fn get_or_build_contracts<'a, P: Platform>( - metadata: &'a Metadata, - metadata_file_path: &'a Path, +async fn get_or_build_contracts( + metadata: &Metadata, + metadata_file_path: &Path, mode: SolcMode, config: &Arguments, - compilation_cache: CompilationCache<'a>, + compilation_cache: CompilationCache, deployed_libraries: &HashMap, ) -> anyhow::Result> { - let key = (metadata_file_path, mode.clone(), P::config_id()); + let key = ( + metadata_file_path.to_path_buf(), + mode.clone(), + P::config_id(), + ); if let Some(compilation_artifact) = compilation_cache.read().await.get(&key).cloned() { let mut compilation_artifact = compilation_artifact.lock().await; match *compilation_artifact {