From 325cb3f6e9bdd5560900c2c28659ee6143dc3751 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Mon, 20 Oct 2025 09:38:51 +0300 Subject: [PATCH] Report the step path to the watcher --- .../src/differential_benchmarks/driver.rs | 38 +++++++++++++------ .../differential_benchmarks/entry_point.rs | 1 + .../src/differential_benchmarks/watcher.rs | 23 ++++++++--- crates/format/src/steps.rs | 4 +- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/crates/core/src/differential_benchmarks/driver.rs b/crates/core/src/differential_benchmarks/driver.rs index eb21fcc..84d5915 100644 --- a/crates/core/src/differential_benchmarks/driver.rs +++ b/crates/core/src/differential_benchmarks/driver.rs @@ -181,7 +181,7 @@ where code, ); let receipt = self - .execute_transaction(tx) + .execute_transaction(tx, None) .and_then(|(_, receipt_fut)| receipt_fut) .await .inspect_err(|err| { @@ -279,15 +279,15 @@ where #[instrument(level = "info", skip_all, fields(driver_id = self.driver_id))] pub async fn execute_function_call( &mut self, - _: &StepPath, + step_path: &StepPath, step: &FunctionCallStep, ) -> Result { let deployment_receipts = self - .handle_function_call_contract_deployment(step) + .handle_function_call_contract_deployment(step_path, step) .await .context("Failed to deploy contracts for the function call step")?; let transaction_hash = self - .handle_function_call_execution(step, deployment_receipts) + .handle_function_call_execution(step_path, step, deployment_receipts) .await .context("Failed to handle the function call execution")?; self.handle_function_call_variable_assignment(step, transaction_hash) @@ -298,6 +298,7 @@ where async fn handle_function_call_contract_deployment( &mut self, + step_path: &StepPath, step: &FunctionCallStep, ) -> Result> { let mut instances_we_must_deploy = IndexMap::::new(); @@ -329,7 +330,13 @@ where .await? }; if let (_, _, Some(receipt)) = self - .get_or_deploy_contract_instance(&instance, caller, calldata, value) + .get_or_deploy_contract_instance( + &instance, + caller, + calldata, + value, + Some(step_path), + ) .await .context("Failed to get or deploy contract instance during input execution")? { @@ -342,6 +349,7 @@ where async fn handle_function_call_execution( &mut self, + step_path: &StepPath, step: &FunctionCallStep, mut deployment_receipts: HashMap, ) -> Result { @@ -356,7 +364,7 @@ where let tx = step .as_transaction(self.resolver.as_ref(), self.default_resolution_context()) .await?; - Ok(self.execute_transaction(tx).await?.0) + Ok(self.execute_transaction(tx, Some(step_path)).await?.0) } } } @@ -520,6 +528,7 @@ where deployer: Address, calldata: Option<&Calldata>, value: Option, + step_path: Option<&StepPath>, ) -> Result<(Address, JsonAbi, Option)> { if let Some((_, address, abi)) = self .execution_state @@ -535,7 +544,7 @@ where } else { info!("Contract instance requires deployment."); let (address, abi, receipt) = self - .deploy_contract(contract_instance, deployer, calldata, value) + .deploy_contract(contract_instance, deployer, calldata, value, step_path) .await .context("Failed to deploy contract")?; info!( @@ -562,6 +571,7 @@ where deployer: Address, calldata: Option<&Calldata>, value: Option, + step_path: Option<&StepPath>, ) -> Result<(Address, JsonAbi, TransactionReceipt)> { let Some(ContractPathAndIdent { contract_source_path, @@ -621,7 +631,7 @@ where }; let receipt = match self - .execute_transaction(tx) + .execute_transaction(tx, step_path) .and_then(|(_, receipt_fut)| receipt_fut) .await { @@ -671,6 +681,7 @@ where async fn execute_transaction( &self, transaction: TransactionRequest, + step_path: Option<&StepPath>, ) -> anyhow::Result<(TxHash, impl Future>)> { let node = self.platform_information.node; let transaction_hash = node @@ -680,9 +691,14 @@ where Span::current().record("transaction_hash", display(transaction_hash)); info!("Submitted transaction"); - self.watcher_tx - .send(WatcherEvent::SubmittedTransaction { transaction_hash }) - .context("Failed to send the transaction hash to the watcher")?; + if let Some(step_path) = step_path { + self.watcher_tx + .send(WatcherEvent::SubmittedTransaction { + transaction_hash, + step_path: step_path.clone(), + }) + .context("Failed to send the transaction hash to the watcher")?; + }; Ok((transaction_hash, async move { info!("Starting to poll for transaction receipt"); diff --git a/crates/core/src/differential_benchmarks/entry_point.rs b/crates/core/src/differential_benchmarks/entry_point.rs index 16c5c21..2e5fc64 100644 --- a/crates/core/src/differential_benchmarks/entry_point.rs +++ b/crates/core/src/differential_benchmarks/entry_point.rs @@ -151,6 +151,7 @@ pub async fn handle_differential_benchmarks( .subscribe_to_full_blocks_information() .await .context("Failed to subscribe to full blocks information from the node")?, + reporter.clone(), ); let driver = Driver::new( platform_information, diff --git a/crates/core/src/differential_benchmarks/watcher.rs b/crates/core/src/differential_benchmarks/watcher.rs index 0430912..c3c94f3 100644 --- a/crates/core/src/differential_benchmarks/watcher.rs +++ b/crates/core/src/differential_benchmarks/watcher.rs @@ -1,10 +1,12 @@ -use std::{collections::HashSet, pin::Pin, sync::Arc}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; use alloy::primitives::{BlockNumber, TxHash}; use anyhow::Result; use futures::{Stream, StreamExt}; use revive_dt_common::types::PlatformIdentifier; +use revive_dt_format::steps::StepPath; use revive_dt_node_interaction::MinedBlockInformation; +use revive_dt_report::Reporter; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, @@ -25,12 +27,16 @@ pub struct Watcher { /// This is a stream of the blocks that were mined by the node. This is for a single platform /// and a single node from that platform. blocks_stream: Pin>>, + + /// The reporter used to send events to the report aggregator. + reporter: Reporter, } impl Watcher { pub fn new( platform_identifier: PlatformIdentifier, blocks_stream: Pin>>, + reporter: Reporter, ) -> (Self, UnboundedSender) { let (tx, rx) = unbounded_channel::(); ( @@ -38,6 +44,7 @@ impl Watcher { platform_identifier, rx, blocks_stream, + reporter, }, tx, ) @@ -61,7 +68,8 @@ impl Watcher { // This is the set of the transaction hashes that the watcher should be looking for and // watch for them in the blocks. The watcher will keep watching for blocks until it sees // that all of the transactions that it was watching for has been seen in the mined blocks. - let watch_for_transaction_hashes = Arc::new(RwLock::new(HashSet::::new())); + let watch_for_transaction_hashes = + Arc::new(RwLock::new(HashMap::::new())); // A boolean that keeps track of whether all of the transactions were submitted or if more // txs are expected to come through the receive side of the channel. We do not want to rely @@ -81,11 +89,14 @@ impl Watcher { // contain nested repetitions and therefore there's no use in doing any // action if the repetitions are nested. WatcherEvent::RepetitionStartEvent { .. } => {} - WatcherEvent::SubmittedTransaction { transaction_hash } => { + WatcherEvent::SubmittedTransaction { + transaction_hash, + step_path, + } => { watch_for_transaction_hashes .write() .await - .insert(transaction_hash); + .insert(transaction_hash, step_path); } WatcherEvent::AllTransactionsSubmitted => { *all_transactions_submitted.write().await = true; @@ -184,7 +195,7 @@ impl Watcher { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum WatcherEvent { /// Informs the watcher that it should begin watching for the blocks mined by the platforms. /// Before the watcher receives this event it will not be watching for the mined blocks. The @@ -204,6 +215,8 @@ pub enum WatcherEvent { SubmittedTransaction { /// The hash of the submitted transaction. transaction_hash: TxHash, + /// The step path of the step that the transaction belongs to. + step_path: StepPath, }, /// Informs the watcher that all of the transactions of this benchmark have been submitted and diff --git a/crates/format/src/steps.rs b/crates/format/src/steps.rs index 19ab310..27f12ca 100644 --- a/crates/format/src/steps.rs +++ b/crates/format/src/steps.rs @@ -45,12 +45,12 @@ pub enum Step { } define_wrapper_type!( - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct StepIdx(usize) impl Display, FromStr; ); define_wrapper_type!( - #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[serde(try_from = "String", into = "String")] pub struct StepPath(Vec); );