diff --git a/Cargo.lock b/Cargo.lock index a04261a..9372616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4589,12 +4589,16 @@ name = "revive-dt-report" version = "0.1.0" dependencies = [ "anyhow", + "paste", "revive-dt-common", "revive-dt-compiler", "revive-dt-config", "revive-dt-format", + "semver 1.0.26", "serde", "serde_json", + "serde_with", + "tokio", ] [[package]] @@ -4845,6 +4849,30 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "schnellru" version = "0.2.4" @@ -5075,15 +5103,17 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.12.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" +checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5" dependencies = [ "base64 0.22.1", "chrono", "hex", "indexmap 1.9.3", "indexmap 2.10.0", + "schemars 0.9.0", + "schemars 1.0.4", "serde", "serde_derive", "serde_json", @@ -5093,9 +5123,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.12.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" +checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index d8b4213..fb7cbcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ futures = { version = "0.3.31" } hex = "0.4.3" regex = "1" moka = "0.12.10" +paste = "1.0.15" reqwest = { version = "0.12.15", features = ["json"] } once_cell = "1.21" semver = { version = "1.0", features = ["serde"] } @@ -43,6 +44,7 @@ serde_json = { version = "1.0", default-features = false, features = [ "std", "unbounded_depth", ] } +serde_with = { version = "3.14.0" } sha2 = { version = "0.10.9" } sp-core = "36.1.0" sp-runtime = "41.1.0" diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 248e4be..a3c5f56 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -14,12 +14,13 @@ use alloy::{ }; use anyhow::Context; use clap::Parser; -use futures::stream; +use futures::{FutureExt, stream}; use futures::{Stream, StreamExt}; use indexmap::IndexMap; use revive_dt_node_interaction::EthereumNode; +use revive_dt_report::{ReportAggregator, Reporter}; use temp_dir::TempDir; -use tokio::{sync::mpsc, try_join}; +use tokio::{join, sync::mpsc, try_join}; use tracing::{debug, info, info_span, instrument}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{EnvFilter, FmtSubscriber}; @@ -68,12 +69,32 @@ fn main() -> anyhow::Result<()> { "Differential testing tool has been initialized" ); + let (reporter, report_aggregator_task) = ReportAggregator::new(args.clone()).into_task(); + let body = async { - for (_, tests) in collect_corpora(&args)? { - match &args.compile_only { - Some(platform) => compile_corpus(&args, &tests, platform).await, - None => execute_corpus(&args, &tests).await?, + let tests = collect_corpora(&args)? + .into_iter() + .inspect(|(corpus, _)| { + reporter + .report_corpus_file_discovery_event(corpus.clone()) + .expect("Can't fail") + }) + .flat_map(|(_, files)| files.into_iter()) + .inspect(|metadata_file| { + reporter + .report_metadata_file_discovery_event( + metadata_file.metadata_file_path.as_path(), + metadata_file.content.clone(), + ) + .expect("Can't fail") + }) + .collect::>(); + + match &args.compile_only { + Some(platform) => { + compile_corpus(&args, &tests, platform, reporter, report_aggregator_task).await } + None => execute_corpus(&args, &tests, reporter, report_aggregator_task).await?, } Ok(()) }; @@ -147,7 +168,12 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result(args: &Arguments, metadata_files: &[MetadataFile]) -> anyhow::Result<()> +async fn run_driver( + args: &Arguments, + metadata_files: &[MetadataFile], + reporter: Reporter, + report_aggregator_task: impl Future>, +) -> anyhow::Result<()> where L: Platform, F: Platform, @@ -157,10 +183,17 @@ where let (report_tx, report_rx) = mpsc::unbounded_channel::<(Test<'_>, CaseResult)>(); let tests = prepare_tests::(args, metadata_files); - let driver_task = start_driver_task::(args, tests, report_tx).await?; + let driver_task = start_driver_task::(args, tests, report_tx) + .await? + .inspect(|_| { + reporter + .report_execution_completed_event() + .expect("Failed to inform the report aggregator of the task finishing") + }); let status_reporter_task = start_reporter_task(report_rx); - tokio::join!(status_reporter_task, driver_task); + let (_, _, rtn) = tokio::join!(status_reporter_task, driver_task, report_aggregator_task); + rtn?; Ok(()) } @@ -387,7 +420,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C const GREEN: &str = "\x1B[32m"; const RED: &str = "\x1B[31m"; - const COLOUR_RESET: &str = "\x1B[0m"; + const COLOR_RESET: &str = "\x1B[0m"; const BOLD: &str = "\x1B[1m"; const BOLD_RESET: &str = "\x1B[22m"; @@ -408,14 +441,14 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C number_of_successes += 1; let _ = writeln!( buf, - "{GREEN}Case Succeeded:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})" + "{GREEN}Case Succeeded:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})" ); } Err(err) => { number_of_failures += 1; let _ = writeln!( buf, - "{RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})" + "{RED}Case Failed:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})" ); failures.push((test, err)); } @@ -439,7 +472,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C let _ = writeln!( buf, - "---- {RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode}) ----\n\n{err}\n" + "---- {RED}Case Failed:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode}) ----\n\n{err}\n" ); } } @@ -447,7 +480,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C // Summary at the end. let _ = writeln!( buf, - "{} cases: {GREEN}{number_of_successes}{COLOUR_RESET} cases succeeded, {RED}{number_of_failures}{COLOUR_RESET} cases failed in {} seconds", + "{} cases: {GREEN}{number_of_successes}{COLOR_RESET} cases succeeded, {RED}{number_of_failures}{COLOR_RESET} cases failed in {} seconds", number_of_successes + number_of_failures, elapsed.as_secs() ); @@ -670,13 +703,18 @@ where .inspect(|steps_executed| info!(steps_executed, "Case succeeded")) } -async fn execute_corpus(args: &Arguments, tests: &[MetadataFile]) -> anyhow::Result<()> { +async fn execute_corpus( + args: &Arguments, + tests: &[MetadataFile], + reporter: Reporter, + report_aggregator_task: impl Future>, +) -> anyhow::Result<()> { match (&args.leader, &args.follower) { (TestingPlatform::Geth, TestingPlatform::Kitchensink) => { - run_driver::(args, tests).await? + run_driver::(args, tests, reporter, report_aggregator_task).await? } (TestingPlatform::Geth, TestingPlatform::Geth) => { - run_driver::(args, tests).await? + run_driver::(args, tests, reporter, report_aggregator_task).await? } _ => unimplemented!(), } @@ -684,7 +722,13 @@ async fn execute_corpus(args: &Arguments, tests: &[MetadataFile]) -> anyhow::Res Ok(()) } -async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &TestingPlatform) { +async fn compile_corpus( + config: &Arguments, + tests: &[MetadataFile], + platform: &TestingPlatform, + _: Reporter, + report_aggregator_task: impl Future>, +) { let tests = tests.iter().flat_map(|metadata| { metadata .solc_modes() @@ -698,8 +742,8 @@ async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &T .map(Arc::new) .expect("Failed to create the cached compiler"); - futures::stream::iter(tests) - .for_each_concurrent(None, |(metadata, mode)| { + let compilation_task = + futures::stream::iter(tests).for_each_concurrent(None, |(metadata, mode)| { let cached_compiler = cached_compiler.clone(); async move { @@ -728,6 +772,6 @@ async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &T } } } - }) - .await; + }); + let _ = join!(compilation_task, report_aggregator_task); } diff --git a/crates/format/src/case.rs b/crates/format/src/case.rs index 2ef9ead..333d8e4 100644 --- a/crates/format/src/case.rs +++ b/crates/format/src/case.rs @@ -71,6 +71,7 @@ impl Case { define_wrapper_type!( /// A wrapper type for the index of test cases found in metadata file. - #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] + #[serde(transparent)] pub struct CaseIdx(usize) impl Display; ); diff --git a/crates/report/Cargo.toml b/crates/report/Cargo.toml index 0e6e896..ac05025 100644 --- a/crates/report/Cargo.toml +++ b/crates/report/Cargo.toml @@ -14,8 +14,12 @@ revive-dt-format = { workspace = true } revive-dt-compiler = { workspace = true } anyhow = { workspace = true } +paste = { workspace = true } +semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } +tokio = { workspace = true } [lints] workspace = true diff --git a/crates/report/src/aggregator.rs b/crates/report/src/aggregator.rs new file mode 100644 index 0000000..21ba45b --- /dev/null +++ b/crates/report/src/aggregator.rs @@ -0,0 +1,121 @@ +//! Implementation of the report aggregator task which consumes the events sent by the various +//! reporters and combines them into a single unified report. + +use std::{ + collections::BTreeSet, + fs::OpenOptions, + path::PathBuf, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::Result; +use revive_dt_config::Arguments; +use revive_dt_format::corpus::Corpus; +use serde::Serialize; +use tokio::sync::{ + broadcast::{Sender, channel}, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, +}; + +use crate::{ + SubscribeToEventsEvent, + reporter_event::ReporterEvent, + runner_event::{CorpusFileDiscoveryEvent, MetadataFileDiscoveryEvent, Reporter, RunnerEvent}, +}; + +pub struct ReportAggregator { + /* Internal Report State */ + report: Report, + /* Channels */ + runner_tx: Option>, + runner_rx: UnboundedReceiver, + listener_tx: Sender, +} + +impl ReportAggregator { + pub fn new(config: Arguments) -> Self { + let (runner_tx, runner_rx) = unbounded_channel::(); + let (listener_tx, _) = channel::(1024); + Self { + report: Report::new(config), + runner_tx: Some(runner_tx), + runner_rx, + listener_tx, + } + } + + pub fn into_task(mut self) -> (Reporter, impl Future>) { + let reporter = self + .runner_tx + .take() + .map(Into::into) + .expect("Can't fail since this can only be called once"); + (reporter, async move { self.aggregate().await }) + } + + async fn aggregate(mut self) -> Result<()> { + while let Some(event) = self.runner_rx.recv().await { + match event { + RunnerEvent::ExecutionCompleted(..) => break, + RunnerEvent::SubscribeToEvents(event) => { + self.handle_subscribe_to_events_event(*event); + } + RunnerEvent::CorpusFileDiscovery(event) => { + self.handle_corpus_file_discovered_event(*event) + } + RunnerEvent::MetadataFileDiscovery(event) => { + self.handle_metadata_file_discovery_event(*event); + } + } + } + + let file_name = { + let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + let mut file_name = current_timestamp.to_string(); + file_name.push_str(".json"); + file_name + }; + let file_path = self.report.config.directory().join(file_name); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .read(false) + .open(file_path)?; + serde_json::to_writer_pretty(file, &self.report)?; + + Ok(()) + } + + fn handle_subscribe_to_events_event(&self, event: SubscribeToEventsEvent) { + let _ = event.tx.send(self.listener_tx.subscribe()); + } + + fn handle_corpus_file_discovered_event(&mut self, event: CorpusFileDiscoveryEvent) { + self.report.corpora.push(event.corpus); + } + + fn handle_metadata_file_discovery_event(&mut self, event: MetadataFileDiscoveryEvent) { + self.report.metadata_files.insert(event.path); + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct Report { + /// The configuration that the tool was started up with. + pub config: Arguments, + /// The list of corpus files that the tool found. + pub corpora: Vec, + /// The list of metadata files that were found by the tool. + pub metadata_files: BTreeSet, +} + +impl Report { + pub fn new(config: Arguments) -> Self { + Self { + config, + corpora: Default::default(), + metadata_files: Default::default(), + } + } +} diff --git a/crates/report/src/common.rs b/crates/report/src/common.rs new file mode 100644 index 0000000..0f21c13 --- /dev/null +++ b/crates/report/src/common.rs @@ -0,0 +1,24 @@ +//! Common types and functions used throughout the crate. + +use std::path::PathBuf; + +use revive_dt_common::define_wrapper_type; +use revive_dt_compiler::Mode; +use revive_dt_format::case::CaseIdx; +use serde::{Deserialize, Serialize}; +use serde_with::{DisplayFromStr, serde_as}; + +define_wrapper_type!( + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] + pub struct MetadataFilePath(PathBuf); +); + +/// An absolute specifier for a test. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct TestSpecifier { + #[serde_as(as = "DisplayFromStr")] + pub solc_mode: Mode, + pub metadata_file_path: PathBuf, + pub case_idx: CaseIdx, +} diff --git a/crates/report/src/lib.rs b/crates/report/src/lib.rs index d104cba..dc66f46 100644 --- a/crates/report/src/lib.rs +++ b/crates/report/src/lib.rs @@ -1 +1,10 @@ //! This crate implements the reporting infrastructure for the differential testing tool. + +mod aggregator; +mod common; +mod reporter_event; +mod runner_event; + +pub use aggregator::*; +pub use reporter_event::*; +pub use runner_event::*; diff --git a/crates/report/src/reporter_event.rs b/crates/report/src/reporter_event.rs new file mode 100644 index 0000000..40d1a0b --- /dev/null +++ b/crates/report/src/reporter_event.rs @@ -0,0 +1,4 @@ +//! A reporter event sent by the report aggregator to the various listeners. + +#[derive(Clone, Debug)] +pub enum ReporterEvent {} diff --git a/crates/report/src/runner_event.rs b/crates/report/src/runner_event.rs new file mode 100644 index 0000000..5ad26da --- /dev/null +++ b/crates/report/src/runner_event.rs @@ -0,0 +1,147 @@ +//! The types associated with the events sent by the runner to the reporter. + +use std::path::PathBuf; + +use revive_dt_format::corpus::Corpus; +use revive_dt_format::metadata::Metadata; +use tokio::sync::{broadcast, oneshot}; + +use crate::ReporterEvent; + +macro_rules! keep_if_doc { + (#[doc = $doc:expr]) => { + #[doc = $doc] + }; + ( $($_:tt)* ) => {}; +} + +/// Defines the runner-event which is sent from the test runners to the report aggregator. +/// +/// This macro defines a number of things related to the reporting infrastructure and the interface +/// used. First of all, it defines the enum of all of the possible events that the runners can send +/// to the aggregator. For each one of the variants it defines a separate struct for it to allow the +/// variant field in the enum to be put in a [`Box`]. +/// +/// In addition to the above, it defines [`From`] implementations for the various event types for +/// the [`RunnerEvent`] enum essentially allowing for events such as [`CorpusFileDiscoveryEvent`] to +/// be converted into a [`RunnerEvent`]. +/// +/// In addition to the above, it also defines the [`RunnerEventReporter`] which is a wrapper around +/// an [`UnboundedSender`] allowing for events to be sent to the report aggregator. +/// +/// With the above description, we can see that this macro defines almost all of the interface of +/// the reporting infrastructure, from the enum itself, to its associated types, and also to the +/// reporter that's used to report events to the aggregator. +/// +/// [`UnboundedSender`]: tokio::sync::mpsc::UnboundedSender +macro_rules! define_event { + ( + $(#[$enum_meta: meta])* + $vis: vis enum $ident: ident { + $( + $(#[$variant_meta: meta])* + $variant_ident: ident { + $( + $(#[$field_meta: meta])* + $field_ident: ident: $field_ty: ty + ),* $(,)? + } + ),* $(,)? + } + ) => { + paste::paste! { + $(#[$enum_meta])* + $vis enum $ident { + $( + $(#[$variant_meta])* + $variant_ident(Box<[<$variant_ident Event>]>) + ),* + } + + $( + $(#[$variant_meta])* + $vis struct [<$variant_ident Event>] { + $( + $(#[$field_meta])* + $vis $field_ident: $field_ty + ),* + } + )* + + $( + impl From<[<$variant_ident Event>]> for $ident { + fn from(value: [<$variant_ident Event>]) -> Self { + Self::$variant_ident(Box::new(value)) + } + } + )* + + /// Provides a way to report events to the aggregator. + /// + /// Under the hood, this is a wrapper around an [`UnboundedSender`] which abstracts away + /// the fact that channels are used and that implements high-level methods for reporting + /// various events to the aggregator. + #[derive(Clone, Debug)] + pub struct [< $ident Reporter >]($vis tokio::sync::mpsc::UnboundedSender<$ident>); + + impl From> for [< $ident Reporter >] { + fn from(value: tokio::sync::mpsc::UnboundedSender<$ident>) -> Self { + Self(value) + } + } + + impl [< $ident Reporter >] { + fn report(&self, event: impl Into<$ident>) -> anyhow::Result<()> { + self.0.send(event.into()).map_err(Into::into) + } + + $( + keep_if_doc!($(#[$variant_meta])*); + pub fn [< report_ $variant_ident:snake _event >](&self, $($field_ident: impl Into<$field_ty>),*) -> anyhow::Result<()> { + self.report([< $variant_ident Event >] { + $($field_ident: $field_ident.into()),* + }) + } + )* + } + } + }; +} + +define_event! { + /// An event type that's sent by the test runners/drivers to the report aggregator. + pub(crate) enum RunnerEvent { + /// An event emitted by the reporter when it wishes to listen to events emitted by the + /// aggregator. + SubscribeToEvents { + /// The channel that the aggregator is to send the receive side of the channel on. + tx: oneshot::Sender> + }, + /// An event emitted by runners when they've discovered a corpus file. + CorpusFileDiscovery { + /// The contents of the corpus file. + corpus: Corpus + }, + /// An event emitted by runners when they've discovered a metadata file. + MetadataFileDiscovery { + /// The path of the metadata file discovered. + path: PathBuf, + /// The content of the metadata file. + metadata: Metadata + }, + /// An event emitted by the runners when the execution is completed and the aggregator can + /// stop. + ExecutionCompleted {} + } +} + +/// An extension to the [`Reporter`] implemented by the macro. +impl RunnerEventReporter { + pub async fn subscribe(&self) -> anyhow::Result> { + let (tx, rx) = oneshot::channel::>(); + self.report_subscribe_to_events_event(tx)?; + rx.await.map_err(Into::into) + } +} + +pub type Reporter = RunnerEventReporter;