Implement the initial set of reporter events

This commit is contained in:
Omar Abdulla
2025-08-24 19:45:01 +03:00
parent bd5ff6d75e
commit cbb6a11404
10 changed files with 413 additions and 27 deletions
Generated
+34 -4
View File
@@ -4589,12 +4589,16 @@ name = "revive-dt-report"
version = "0.1.0"
dependencies = [
"anyhow",
"paste",
"revive-dt-common",
"revive-dt-compiler",
"revive-dt-config",
"revive-dt-format",
"semver 1.0.26",
"serde",
"serde_json",
"serde_with",
"tokio",
]
[[package]]
@@ -4845,6 +4849,30 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "schemars"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "schemars"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "schnellru"
version = "0.2.4"
@@ -5075,15 +5103,17 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.12.0"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5"
dependencies = [
"base64 0.22.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.10.0",
"schemars 0.9.0",
"schemars 1.0.4",
"serde",
"serde_derive",
"serde_json",
@@ -5093,9 +5123,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.12.0"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f"
dependencies = [
"darling",
"proc-macro2",
+2
View File
@@ -34,6 +34,7 @@ futures = { version = "0.3.31" }
hex = "0.4.3"
regex = "1"
moka = "0.12.10"
paste = "1.0.15"
reqwest = { version = "0.12.15", features = ["json"] }
once_cell = "1.21"
semver = { version = "1.0", features = ["serde"] }
@@ -43,6 +44,7 @@ serde_json = { version = "1.0", default-features = false, features = [
"std",
"unbounded_depth",
] }
serde_with = { version = "3.14.0" }
sha2 = { version = "0.10.9" }
sp-core = "36.1.0"
sp-runtime = "41.1.0"
+66 -22
View File
@@ -14,12 +14,13 @@ use alloy::{
};
use anyhow::Context;
use clap::Parser;
use futures::stream;
use futures::{FutureExt, stream};
use futures::{Stream, StreamExt};
use indexmap::IndexMap;
use revive_dt_node_interaction::EthereumNode;
use revive_dt_report::{ReportAggregator, Reporter};
use temp_dir::TempDir;
use tokio::{sync::mpsc, try_join};
use tokio::{join, sync::mpsc, try_join};
use tracing::{debug, info, info_span, instrument};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
@@ -68,12 +69,32 @@ fn main() -> anyhow::Result<()> {
"Differential testing tool has been initialized"
);
let (reporter, report_aggregator_task) = ReportAggregator::new(args.clone()).into_task();
let body = async {
for (_, tests) in collect_corpora(&args)? {
match &args.compile_only {
Some(platform) => compile_corpus(&args, &tests, platform).await,
None => execute_corpus(&args, &tests).await?,
let tests = collect_corpora(&args)?
.into_iter()
.inspect(|(corpus, _)| {
reporter
.report_corpus_file_discovery_event(corpus.clone())
.expect("Can't fail")
})
.flat_map(|(_, files)| files.into_iter())
.inspect(|metadata_file| {
reporter
.report_metadata_file_discovery_event(
metadata_file.metadata_file_path.as_path(),
metadata_file.content.clone(),
)
.expect("Can't fail")
})
.collect::<Vec<_>>();
match &args.compile_only {
Some(platform) => {
compile_corpus(&args, &tests, platform, reporter, report_aggregator_task).await
}
None => execute_corpus(&args, &tests, reporter, report_aggregator_task).await?,
}
Ok(())
};
@@ -147,7 +168,12 @@ fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<Metad
Ok(corpora)
}
async fn run_driver<L, F>(args: &Arguments, metadata_files: &[MetadataFile]) -> anyhow::Result<()>
async fn run_driver<L, F>(
args: &Arguments,
metadata_files: &[MetadataFile],
reporter: Reporter,
report_aggregator_task: impl Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()>
where
L: Platform,
F: Platform,
@@ -157,10 +183,17 @@ where
let (report_tx, report_rx) = mpsc::unbounded_channel::<(Test<'_>, CaseResult)>();
let tests = prepare_tests::<L, F>(args, metadata_files);
let driver_task = start_driver_task::<L, F>(args, tests, report_tx).await?;
let driver_task = start_driver_task::<L, F>(args, tests, report_tx)
.await?
.inspect(|_| {
reporter
.report_execution_completed_event()
.expect("Failed to inform the report aggregator of the task finishing")
});
let status_reporter_task = start_reporter_task(report_rx);
tokio::join!(status_reporter_task, driver_task);
let (_, _, rtn) = tokio::join!(status_reporter_task, driver_task, report_aggregator_task);
rtn?;
Ok(())
}
@@ -387,7 +420,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C
const GREEN: &str = "\x1B[32m";
const RED: &str = "\x1B[31m";
const COLOUR_RESET: &str = "\x1B[0m";
const COLOR_RESET: &str = "\x1B[0m";
const BOLD: &str = "\x1B[1m";
const BOLD_RESET: &str = "\x1B[22m";
@@ -408,14 +441,14 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C
number_of_successes += 1;
let _ = writeln!(
buf,
"{GREEN}Case Succeeded:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})"
"{GREEN}Case Succeeded:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})"
);
}
Err(err) => {
number_of_failures += 1;
let _ = writeln!(
buf,
"{RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})"
"{RED}Case Failed:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode})"
);
failures.push((test, err));
}
@@ -439,7 +472,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C
let _ = writeln!(
buf,
"---- {RED}Case Failed:{COLOUR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode}) ----\n\n{err}\n"
"---- {RED}Case Failed:{COLOR_RESET} {test_path} -> {case_name}:{case_idx} (mode: {test_mode}) ----\n\n{err}\n"
);
}
}
@@ -447,7 +480,7 @@ async fn start_reporter_task(mut report_rx: mpsc::UnboundedReceiver<(Test<'_>, C
// Summary at the end.
let _ = writeln!(
buf,
"{} cases: {GREEN}{number_of_successes}{COLOUR_RESET} cases succeeded, {RED}{number_of_failures}{COLOUR_RESET} cases failed in {} seconds",
"{} cases: {GREEN}{number_of_successes}{COLOR_RESET} cases succeeded, {RED}{number_of_failures}{COLOR_RESET} cases failed in {} seconds",
number_of_successes + number_of_failures,
elapsed.as_secs()
);
@@ -670,13 +703,18 @@ where
.inspect(|steps_executed| info!(steps_executed, "Case succeeded"))
}
async fn execute_corpus(args: &Arguments, tests: &[MetadataFile]) -> anyhow::Result<()> {
async fn execute_corpus(
args: &Arguments,
tests: &[MetadataFile],
reporter: Reporter,
report_aggregator_task: impl Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()> {
match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
run_driver::<Geth, Kitchensink>(args, tests).await?
run_driver::<Geth, Kitchensink>(args, tests, reporter, report_aggregator_task).await?
}
(TestingPlatform::Geth, TestingPlatform::Geth) => {
run_driver::<Geth, Geth>(args, tests).await?
run_driver::<Geth, Geth>(args, tests, reporter, report_aggregator_task).await?
}
_ => unimplemented!(),
}
@@ -684,7 +722,13 @@ async fn execute_corpus(args: &Arguments, tests: &[MetadataFile]) -> anyhow::Res
Ok(())
}
async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &TestingPlatform) {
async fn compile_corpus(
config: &Arguments,
tests: &[MetadataFile],
platform: &TestingPlatform,
_: Reporter,
report_aggregator_task: impl Future<Output = anyhow::Result<()>>,
) {
let tests = tests.iter().flat_map(|metadata| {
metadata
.solc_modes()
@@ -698,8 +742,8 @@ async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &T
.map(Arc::new)
.expect("Failed to create the cached compiler");
futures::stream::iter(tests)
.for_each_concurrent(None, |(metadata, mode)| {
let compilation_task =
futures::stream::iter(tests).for_each_concurrent(None, |(metadata, mode)| {
let cached_compiler = cached_compiler.clone();
async move {
@@ -728,6 +772,6 @@ async fn compile_corpus(config: &Arguments, tests: &[MetadataFile], platform: &T
}
}
}
})
.await;
});
let _ = join!(compilation_task, report_aggregator_task);
}
+2 -1
View File
@@ -71,6 +71,7 @@ impl Case {
define_wrapper_type!(
/// A wrapper type for the index of test cases found in metadata file.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct CaseIdx(usize) impl Display;
);
+4
View File
@@ -14,8 +14,12 @@ revive-dt-format = { workspace = true }
revive-dt-compiler = { workspace = true }
anyhow = { workspace = true }
paste = { workspace = true }
semver = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
[lints]
workspace = true
+121
View File
@@ -0,0 +1,121 @@
//! Implementation of the report aggregator task which consumes the events sent by the various
//! reporters and combines them into a single unified report.
use std::{
collections::BTreeSet,
fs::OpenOptions,
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Result;
use revive_dt_config::Arguments;
use revive_dt_format::corpus::Corpus;
use serde::Serialize;
use tokio::sync::{
broadcast::{Sender, channel},
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
};
use crate::{
SubscribeToEventsEvent,
reporter_event::ReporterEvent,
runner_event::{CorpusFileDiscoveryEvent, MetadataFileDiscoveryEvent, Reporter, RunnerEvent},
};
pub struct ReportAggregator {
/* Internal Report State */
report: Report,
/* Channels */
runner_tx: Option<UnboundedSender<RunnerEvent>>,
runner_rx: UnboundedReceiver<RunnerEvent>,
listener_tx: Sender<ReporterEvent>,
}
impl ReportAggregator {
pub fn new(config: Arguments) -> Self {
let (runner_tx, runner_rx) = unbounded_channel::<RunnerEvent>();
let (listener_tx, _) = channel::<ReporterEvent>(1024);
Self {
report: Report::new(config),
runner_tx: Some(runner_tx),
runner_rx,
listener_tx,
}
}
pub fn into_task(mut self) -> (Reporter, impl Future<Output = Result<()>>) {
let reporter = self
.runner_tx
.take()
.map(Into::into)
.expect("Can't fail since this can only be called once");
(reporter, async move { self.aggregate().await })
}
async fn aggregate(mut self) -> Result<()> {
while let Some(event) = self.runner_rx.recv().await {
match event {
RunnerEvent::ExecutionCompleted(..) => break,
RunnerEvent::SubscribeToEvents(event) => {
self.handle_subscribe_to_events_event(*event);
}
RunnerEvent::CorpusFileDiscovery(event) => {
self.handle_corpus_file_discovered_event(*event)
}
RunnerEvent::MetadataFileDiscovery(event) => {
self.handle_metadata_file_discovery_event(*event);
}
}
}
let file_name = {
let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
let mut file_name = current_timestamp.to_string();
file_name.push_str(".json");
file_name
};
let file_path = self.report.config.directory().join(file_name);
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.read(false)
.open(file_path)?;
serde_json::to_writer_pretty(file, &self.report)?;
Ok(())
}
fn handle_subscribe_to_events_event(&self, event: SubscribeToEventsEvent) {
let _ = event.tx.send(self.listener_tx.subscribe());
}
fn handle_corpus_file_discovered_event(&mut self, event: CorpusFileDiscoveryEvent) {
self.report.corpora.push(event.corpus);
}
fn handle_metadata_file_discovery_event(&mut self, event: MetadataFileDiscoveryEvent) {
self.report.metadata_files.insert(event.path);
}
}
#[derive(Clone, Debug, Serialize)]
pub struct Report {
/// The configuration that the tool was started up with.
pub config: Arguments,
/// The list of corpus files that the tool found.
pub corpora: Vec<Corpus>,
/// The list of metadata files that were found by the tool.
pub metadata_files: BTreeSet<PathBuf>,
}
impl Report {
pub fn new(config: Arguments) -> Self {
Self {
config,
corpora: Default::default(),
metadata_files: Default::default(),
}
}
}
+24
View File
@@ -0,0 +1,24 @@
//! Common types and functions used throughout the crate.
use std::path::PathBuf;
use revive_dt_common::define_wrapper_type;
use revive_dt_compiler::Mode;
use revive_dt_format::case::CaseIdx;
use serde::{Deserialize, Serialize};
use serde_with::{DisplayFromStr, serde_as};
define_wrapper_type!(
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct MetadataFilePath(PathBuf);
);
/// An absolute specifier for a test.
#[serde_as]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)]
pub struct TestSpecifier {
#[serde_as(as = "DisplayFromStr")]
pub solc_mode: Mode,
pub metadata_file_path: PathBuf,
pub case_idx: CaseIdx,
}
+9
View File
@@ -1 +1,10 @@
//! This crate implements the reporting infrastructure for the differential testing tool.
mod aggregator;
mod common;
mod reporter_event;
mod runner_event;
pub use aggregator::*;
pub use reporter_event::*;
pub use runner_event::*;
+4
View File
@@ -0,0 +1,4 @@
//! A reporter event sent by the report aggregator to the various listeners.
#[derive(Clone, Debug)]
pub enum ReporterEvent {}
+147
View File
@@ -0,0 +1,147 @@
//! The types associated with the events sent by the runner to the reporter.
use std::path::PathBuf;
use revive_dt_format::corpus::Corpus;
use revive_dt_format::metadata::Metadata;
use tokio::sync::{broadcast, oneshot};
use crate::ReporterEvent;
macro_rules! keep_if_doc {
(#[doc = $doc:expr]) => {
#[doc = $doc]
};
( $($_:tt)* ) => {};
}
/// Defines the runner-event which is sent from the test runners to the report aggregator.
///
/// This macro defines a number of things related to the reporting infrastructure and the interface
/// used. First of all, it defines the enum of all of the possible events that the runners can send
/// to the aggregator. For each one of the variants it defines a separate struct for it to allow the
/// variant field in the enum to be put in a [`Box`].
///
/// In addition to the above, it defines [`From`] implementations for the various event types for
/// the [`RunnerEvent`] enum essentially allowing for events such as [`CorpusFileDiscoveryEvent`] to
/// be converted into a [`RunnerEvent`].
///
/// In addition to the above, it also defines the [`RunnerEventReporter`] which is a wrapper around
/// an [`UnboundedSender`] allowing for events to be sent to the report aggregator.
///
/// With the above description, we can see that this macro defines almost all of the interface of
/// the reporting infrastructure, from the enum itself, to its associated types, and also to the
/// reporter that's used to report events to the aggregator.
///
/// [`UnboundedSender`]: tokio::sync::mpsc::UnboundedSender
macro_rules! define_event {
(
$(#[$enum_meta: meta])*
$vis: vis enum $ident: ident {
$(
$(#[$variant_meta: meta])*
$variant_ident: ident {
$(
$(#[$field_meta: meta])*
$field_ident: ident: $field_ty: ty
),* $(,)?
}
),* $(,)?
}
) => {
paste::paste! {
$(#[$enum_meta])*
$vis enum $ident {
$(
$(#[$variant_meta])*
$variant_ident(Box<[<$variant_ident Event>]>)
),*
}
$(
$(#[$variant_meta])*
$vis struct [<$variant_ident Event>] {
$(
$(#[$field_meta])*
$vis $field_ident: $field_ty
),*
}
)*
$(
impl From<[<$variant_ident Event>]> for $ident {
fn from(value: [<$variant_ident Event>]) -> Self {
Self::$variant_ident(Box::new(value))
}
}
)*
/// Provides a way to report events to the aggregator.
///
/// Under the hood, this is a wrapper around an [`UnboundedSender`] which abstracts away
/// the fact that channels are used and that implements high-level methods for reporting
/// various events to the aggregator.
#[derive(Clone, Debug)]
pub struct [< $ident Reporter >]($vis tokio::sync::mpsc::UnboundedSender<$ident>);
impl From<tokio::sync::mpsc::UnboundedSender<$ident>> for [< $ident Reporter >] {
fn from(value: tokio::sync::mpsc::UnboundedSender<$ident>) -> Self {
Self(value)
}
}
impl [< $ident Reporter >] {
fn report(&self, event: impl Into<$ident>) -> anyhow::Result<()> {
self.0.send(event.into()).map_err(Into::into)
}
$(
keep_if_doc!($(#[$variant_meta])*);
pub fn [< report_ $variant_ident:snake _event >](&self, $($field_ident: impl Into<$field_ty>),*) -> anyhow::Result<()> {
self.report([< $variant_ident Event >] {
$($field_ident: $field_ident.into()),*
})
}
)*
}
}
};
}
define_event! {
/// An event type that's sent by the test runners/drivers to the report aggregator.
pub(crate) enum RunnerEvent {
/// An event emitted by the reporter when it wishes to listen to events emitted by the
/// aggregator.
SubscribeToEvents {
/// The channel that the aggregator is to send the receive side of the channel on.
tx: oneshot::Sender<broadcast::Receiver<ReporterEvent>>
},
/// An event emitted by runners when they've discovered a corpus file.
CorpusFileDiscovery {
/// The contents of the corpus file.
corpus: Corpus
},
/// An event emitted by runners when they've discovered a metadata file.
MetadataFileDiscovery {
/// The path of the metadata file discovered.
path: PathBuf,
/// The content of the metadata file.
metadata: Metadata
},
/// An event emitted by the runners when the execution is completed and the aggregator can
/// stop.
ExecutionCompleted {}
}
}
/// An extension to the [`Reporter`] implemented by the macro.
impl RunnerEventReporter {
pub async fn subscribe(&self) -> anyhow::Result<broadcast::Receiver<ReporterEvent>> {
let (tx, rx) = oneshot::channel::<broadcast::Receiver<ReporterEvent>>();
self.report_subscribe_to_events_event(tx)?;
rx.await.map_err(Into::into)
}
}
pub type Reporter = RunnerEventReporter;