Wire up reporting to benchmarks (#195)

* Modify the structure of the `MinedBlockInformation`

* Report the step path to the watcher

* Make report format more benchmark friendly

* make report more benchmarks friendly

* Add more models to the report

* Remove corpus from the report

* Add step information to the benchmark report

* Include the contract information in the report

* Add the block information to the report

* compute metrics in each report

* Cleanup watcher from temp code
This commit is contained in:
Omar
2025-10-24 05:15:29 +03:00
committed by GitHub
parent f1a911545e
commit b71445b632
16 changed files with 670 additions and 196 deletions
@@ -181,7 +181,7 @@ where
code,
);
let receipt = self
.execute_transaction(tx)
.execute_transaction(tx, None)
.and_then(|(_, receipt_fut)| receipt_fut)
.await
.inspect_err(|err| {
@@ -218,6 +218,22 @@ where
.inspect_err(|err| error!(?err, "Post-linking compilation failed"))
.context("Failed to compile the post-link contracts")?;
for (contract_path, contract_name_to_info_mapping) in compiler_output.contracts.iter() {
for (contract_name, (contract_bytecode, _)) in contract_name_to_info_mapping.iter() {
let contract_bytecode = hex::decode(contract_bytecode)
.expect("Impossible for us to get an undecodable bytecode after linking");
self.platform_information
.reporter
.report_contract_information_event(
contract_path.to_path_buf(),
contract_name.clone(),
contract_bytecode.len(),
)
.expect("Should not fail");
}
}
self.execution_state = ExecutionState::new(
compiler_output.contracts,
deployed_libraries.unwrap_or_default(),
@@ -279,15 +295,15 @@ where
#[instrument(level = "info", skip_all, fields(driver_id = self.driver_id))]
pub async fn execute_function_call(
&mut self,
_: &StepPath,
step_path: &StepPath,
step: &FunctionCallStep,
) -> Result<usize> {
let deployment_receipts = self
.handle_function_call_contract_deployment(step)
.handle_function_call_contract_deployment(step_path, step)
.await
.context("Failed to deploy contracts for the function call step")?;
let transaction_hash = self
.handle_function_call_execution(step, deployment_receipts)
.handle_function_call_execution(step_path, step, deployment_receipts)
.await
.context("Failed to handle the function call execution")?;
self.handle_function_call_variable_assignment(step, transaction_hash)
@@ -298,6 +314,7 @@ where
async fn handle_function_call_contract_deployment(
&mut self,
step_path: &StepPath,
step: &FunctionCallStep,
) -> Result<HashMap<ContractInstance, TransactionReceipt>> {
let mut instances_we_must_deploy = IndexMap::<ContractInstance, bool>::new();
@@ -329,7 +346,13 @@ where
.await?
};
if let (_, _, Some(receipt)) = self
.get_or_deploy_contract_instance(&instance, caller, calldata, value)
.get_or_deploy_contract_instance(
&instance,
caller,
calldata,
value,
Some(step_path),
)
.await
.context("Failed to get or deploy contract instance during input execution")?
{
@@ -342,6 +365,7 @@ where
async fn handle_function_call_execution(
&mut self,
step_path: &StepPath,
step: &FunctionCallStep,
mut deployment_receipts: HashMap<ContractInstance, TransactionReceipt>,
) -> Result<TxHash> {
@@ -356,7 +380,7 @@ where
let tx = step
.as_transaction(self.resolver.as_ref(), self.default_resolution_context())
.await?;
Ok(self.execute_transaction(tx).await?.0)
Ok(self.execute_transaction(tx, Some(step_path)).await?.0)
}
}
}
@@ -520,6 +544,7 @@ where
deployer: Address,
calldata: Option<&Calldata>,
value: Option<EtherValue>,
step_path: Option<&StepPath>,
) -> Result<(Address, JsonAbi, Option<TransactionReceipt>)> {
if let Some((_, address, abi)) = self
.execution_state
@@ -535,7 +560,7 @@ where
} else {
info!("Contract instance requires deployment.");
let (address, abi, receipt) = self
.deploy_contract(contract_instance, deployer, calldata, value)
.deploy_contract(contract_instance, deployer, calldata, value, step_path)
.await
.context("Failed to deploy contract")?;
info!(
@@ -562,6 +587,7 @@ where
deployer: Address,
calldata: Option<&Calldata>,
value: Option<EtherValue>,
step_path: Option<&StepPath>,
) -> Result<(Address, JsonAbi, TransactionReceipt)> {
let Some(ContractPathAndIdent {
contract_source_path,
@@ -621,7 +647,7 @@ where
};
let receipt = match self
.execute_transaction(tx)
.execute_transaction(tx, step_path)
.and_then(|(_, receipt_fut)| receipt_fut)
.await
{
@@ -671,6 +697,7 @@ where
async fn execute_transaction(
&self,
transaction: TransactionRequest,
step_path: Option<&StepPath>,
) -> anyhow::Result<(TxHash, impl Future<Output = Result<TransactionReceipt>>)> {
let node = self.platform_information.node;
let transaction_hash = node
@@ -680,9 +707,14 @@ where
Span::current().record("transaction_hash", display(transaction_hash));
info!("Submitted transaction");
self.watcher_tx
.send(WatcherEvent::SubmittedTransaction { transaction_hash })
.context("Failed to send the transaction hash to the watcher")?;
if let Some(step_path) = step_path {
self.watcher_tx
.send(WatcherEvent::SubmittedTransaction {
transaction_hash,
step_path: step_path.clone(),
})
.context("Failed to send the transaction hash to the watcher")?;
};
Ok((transaction_hash, async move {
info!("Starting to poll for transaction receipt");
@@ -145,12 +145,14 @@ pub async fn handle_differential_benchmarks(
context.wallet_configuration.highest_private_key_exclusive(),
)));
let (watcher, watcher_tx) = Watcher::new(
platform_identifier,
platform_information
.node
.subscribe_to_full_blocks_information()
.await
.context("Failed to subscribe to full blocks information from the node")?,
test_definition
.reporter
.execution_specific_reporter(0usize, platform_identifier),
);
let driver = Driver::new(
platform_information,
@@ -1,10 +1,15 @@
use std::{collections::HashSet, pin::Pin, sync::Arc};
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use alloy::primitives::{BlockNumber, TxHash};
use anyhow::Result;
use futures::{Stream, StreamExt};
use revive_dt_common::types::PlatformIdentifier;
use revive_dt_node_interaction::MinedBlockInformation;
use revive_dt_format::steps::StepPath;
use revive_dt_report::{ExecutionSpecificReporter, MinedBlockInformation, TransactionInformation};
use tokio::sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
@@ -15,9 +20,6 @@ use tracing::{info, instrument};
/// and MUST NOT be re-used between workloads since it holds important internal state for a given
/// workload and is not designed for reuse.
pub struct Watcher {
/// The identifier of the platform that this watcher is for.
platform_identifier: PlatformIdentifier,
/// The receive side of the channel that all of the drivers and various other parts of the code
/// send events to the watcher on.
rx: UnboundedReceiver<WatcherEvent>,
@@ -25,19 +27,22 @@ pub struct Watcher {
/// This is a stream of the blocks that were mined by the node. This is for a single platform
/// and a single node from that platform.
blocks_stream: Pin<Box<dyn Stream<Item = MinedBlockInformation>>>,
/// The reporter used to send events to the report aggregator.
reporter: ExecutionSpecificReporter,
}
impl Watcher {
pub fn new(
platform_identifier: PlatformIdentifier,
blocks_stream: Pin<Box<dyn Stream<Item = MinedBlockInformation>>>,
reporter: ExecutionSpecificReporter,
) -> (Self, UnboundedSender<WatcherEvent>) {
let (tx, rx) = unbounded_channel::<WatcherEvent>();
(
Self {
platform_identifier,
rx,
blocks_stream,
reporter,
},
tx,
)
@@ -61,7 +66,8 @@ impl Watcher {
// This is the set of the transaction hashes that the watcher should be looking for and
// watch for them in the blocks. The watcher will keep watching for blocks until it sees
// that all of the transactions that it was watching for has been seen in the mined blocks.
let watch_for_transaction_hashes = Arc::new(RwLock::new(HashSet::<TxHash>::new()));
let watch_for_transaction_hashes =
Arc::new(RwLock::new(HashMap::<TxHash, (StepPath, SystemTime)>::new()));
// A boolean that keeps track of whether all of the transactions were submitted or if more
// txs are expected to come through the receive side of the channel. We do not want to rely
@@ -81,11 +87,14 @@ impl Watcher {
// contain nested repetitions and therefore there's no use in doing any
// action if the repetitions are nested.
WatcherEvent::RepetitionStartEvent { .. } => {}
WatcherEvent::SubmittedTransaction { transaction_hash } => {
WatcherEvent::SubmittedTransaction {
transaction_hash,
step_path,
} => {
watch_for_transaction_hashes
.write()
.await
.insert(transaction_hash);
.insert(transaction_hash, (step_path, SystemTime::now()));
}
WatcherEvent::AllTransactionsSubmitted => {
*all_transactions_submitted.write().await = true;
@@ -97,25 +106,21 @@ impl Watcher {
}
}
};
let reporter = self.reporter.clone();
let block_information_watching_task = {
let watch_for_transaction_hashes = watch_for_transaction_hashes.clone();
let all_transactions_submitted = all_transactions_submitted.clone();
let mut blocks_information_stream = self.blocks_stream;
async move {
let mut mined_blocks_information = Vec::new();
// region:TEMPORARY
eprintln!("Watcher information for {}", self.platform_identifier);
eprintln!(
"block_number,block_timestamp,mined_gas,block_gas_limit,tx_count,ref_time,max_ref_time,proof_size,max_proof_size"
);
// endregion:TEMPORARY
while let Some(block) = blocks_information_stream.next().await {
// If the block number is equal to or less than the last block before the
// repetition then we ignore it and continue on to the next block.
if block.block_number <= ignore_block_before {
if block.ethereum_block_information.block_number <= ignore_block_before {
continue;
}
reporter
.report_block_mined_event(block.clone())
.expect("Can't fail");
if *all_transactions_submitted.read().await
&& watch_for_transaction_hashes.read().await.is_empty()
@@ -124,8 +129,8 @@ impl Watcher {
}
info!(
block_number = block.block_number,
block_tx_count = block.transaction_hashes.len(),
block_number = block.ethereum_block_information.block_number,
block_tx_count = block.ethereum_block_information.transaction_hashes.len(),
remaining_transactions = watch_for_transaction_hashes.read().await.len(),
"Observed a block"
);
@@ -134,33 +139,31 @@ impl Watcher {
// are currently watching for.
let mut watch_for_transaction_hashes =
watch_for_transaction_hashes.write().await;
for tx_hash in block.transaction_hashes.iter() {
watch_for_transaction_hashes.remove(tx_hash);
for tx_hash in block.ethereum_block_information.transaction_hashes.iter() {
let Some((step_path, submission_time)) =
watch_for_transaction_hashes.remove(tx_hash)
else {
continue;
};
let transaction_information = TransactionInformation {
transaction_hash: *tx_hash,
submission_timestamp: submission_time
.duration_since(UNIX_EPOCH)
.expect("Can't fail")
.as_secs() as _,
block_timestamp: block.ethereum_block_information.block_timestamp,
block_number: block.ethereum_block_information.block_number,
};
reporter
.report_step_transaction_information_event(
step_path,
transaction_information,
)
.expect("Can't fail")
}
// region:TEMPORARY
// TODO: The following core is TEMPORARY and will be removed once we have proper
// reporting in place and then it can be removed. This serves as as way of doing
// some very simple reporting for the time being.
eprintln!(
"\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"",
block.block_number,
block.block_timestamp,
block.mined_gas,
block.block_gas_limit,
block.transaction_hashes.len(),
block.ref_time,
block.max_ref_time,
block.proof_size,
block.max_proof_size,
);
// endregion:TEMPORARY
mined_blocks_information.push(block);
}
info!("Watcher's Block Watching Task Finished");
mined_blocks_information
}
};
@@ -172,7 +175,7 @@ impl Watcher {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum WatcherEvent {
/// Informs the watcher that it should begin watching for the blocks mined by the platforms.
/// Before the watcher receives this event it will not be watching for the mined blocks. The
@@ -192,6 +195,8 @@ pub enum WatcherEvent {
SubmittedTransaction {
/// The hash of the submitted transaction.
transaction_hash: TxHash,
/// The step path of the step that the transaction belongs to.
step_path: StepPath,
},
/// Informs the watcher that all of the transactions of this benchmark have been submitted and
+12 -3
View File
@@ -32,9 +32,18 @@ pub async fn create_test_definitions_stream<'a>(
only_execute_failed_tests: Option<&Report>,
reporter: Reporter,
) -> impl Stream<Item = TestDefinition<'a>> {
let cloned_reporter = reporter.clone();
stream::iter(
corpus
.cases_iterator()
.inspect(move |(metadata_file, ..)| {
cloned_reporter
.report_metadata_file_discovery_event(
metadata_file.metadata_file_path.clone(),
metadata_file.content.clone(),
)
.unwrap();
})
.map(move |(metadata_file, case_idx, case, mode)| {
let reporter = reporter.clone();
@@ -310,10 +319,10 @@ impl<'a> TestDefinition<'a> {
};
let test_case_status = report
.test_case_information
.execution_information
.get(&(self.metadata_file_path.to_path_buf().into()))
.and_then(|obj| obj.get(&self.mode))
.and_then(|obj| obj.get(&self.case_idx))
.and_then(|obj| obj.case_reports.get(&self.case_idx))
.and_then(|obj| obj.mode_execution_reports.get(&self.mode))
.and_then(|obj| obj.status.as_ref());
match test_case_status {