Report the step path to the watcher

This commit is contained in:
Omar Abdulla
2025-10-20 09:38:51 +03:00
parent 3d5feea4a4
commit 325cb3f6e9
4 changed files with 48 additions and 18 deletions
@@ -181,7 +181,7 @@ where
code,
);
let receipt = self
.execute_transaction(tx)
.execute_transaction(tx, None)
.and_then(|(_, receipt_fut)| receipt_fut)
.await
.inspect_err(|err| {
@@ -279,15 +279,15 @@ where
#[instrument(level = "info", skip_all, fields(driver_id = self.driver_id))]
pub async fn execute_function_call(
&mut self,
_: &StepPath,
step_path: &StepPath,
step: &FunctionCallStep,
) -> Result<usize> {
let deployment_receipts = self
.handle_function_call_contract_deployment(step)
.handle_function_call_contract_deployment(step_path, step)
.await
.context("Failed to deploy contracts for the function call step")?;
let transaction_hash = self
.handle_function_call_execution(step, deployment_receipts)
.handle_function_call_execution(step_path, step, deployment_receipts)
.await
.context("Failed to handle the function call execution")?;
self.handle_function_call_variable_assignment(step, transaction_hash)
@@ -298,6 +298,7 @@ where
async fn handle_function_call_contract_deployment(
&mut self,
step_path: &StepPath,
step: &FunctionCallStep,
) -> Result<HashMap<ContractInstance, TransactionReceipt>> {
let mut instances_we_must_deploy = IndexMap::<ContractInstance, bool>::new();
@@ -329,7 +330,13 @@ where
.await?
};
if let (_, _, Some(receipt)) = self
.get_or_deploy_contract_instance(&instance, caller, calldata, value)
.get_or_deploy_contract_instance(
&instance,
caller,
calldata,
value,
Some(step_path),
)
.await
.context("Failed to get or deploy contract instance during input execution")?
{
@@ -342,6 +349,7 @@ where
async fn handle_function_call_execution(
&mut self,
step_path: &StepPath,
step: &FunctionCallStep,
mut deployment_receipts: HashMap<ContractInstance, TransactionReceipt>,
) -> Result<TxHash> {
@@ -356,7 +364,7 @@ where
let tx = step
.as_transaction(self.resolver.as_ref(), self.default_resolution_context())
.await?;
Ok(self.execute_transaction(tx).await?.0)
Ok(self.execute_transaction(tx, Some(step_path)).await?.0)
}
}
}
@@ -520,6 +528,7 @@ where
deployer: Address,
calldata: Option<&Calldata>,
value: Option<EtherValue>,
step_path: Option<&StepPath>,
) -> Result<(Address, JsonAbi, Option<TransactionReceipt>)> {
if let Some((_, address, abi)) = self
.execution_state
@@ -535,7 +544,7 @@ where
} else {
info!("Contract instance requires deployment.");
let (address, abi, receipt) = self
.deploy_contract(contract_instance, deployer, calldata, value)
.deploy_contract(contract_instance, deployer, calldata, value, step_path)
.await
.context("Failed to deploy contract")?;
info!(
@@ -562,6 +571,7 @@ where
deployer: Address,
calldata: Option<&Calldata>,
value: Option<EtherValue>,
step_path: Option<&StepPath>,
) -> Result<(Address, JsonAbi, TransactionReceipt)> {
let Some(ContractPathAndIdent {
contract_source_path,
@@ -621,7 +631,7 @@ where
};
let receipt = match self
.execute_transaction(tx)
.execute_transaction(tx, step_path)
.and_then(|(_, receipt_fut)| receipt_fut)
.await
{
@@ -671,6 +681,7 @@ where
async fn execute_transaction(
&self,
transaction: TransactionRequest,
step_path: Option<&StepPath>,
) -> anyhow::Result<(TxHash, impl Future<Output = Result<TransactionReceipt>>)> {
let node = self.platform_information.node;
let transaction_hash = node
@@ -680,9 +691,14 @@ where
Span::current().record("transaction_hash", display(transaction_hash));
info!("Submitted transaction");
self.watcher_tx
.send(WatcherEvent::SubmittedTransaction { transaction_hash })
.context("Failed to send the transaction hash to the watcher")?;
if let Some(step_path) = step_path {
self.watcher_tx
.send(WatcherEvent::SubmittedTransaction {
transaction_hash,
step_path: step_path.clone(),
})
.context("Failed to send the transaction hash to the watcher")?;
};
Ok((transaction_hash, async move {
info!("Starting to poll for transaction receipt");
@@ -151,6 +151,7 @@ pub async fn handle_differential_benchmarks(
.subscribe_to_full_blocks_information()
.await
.context("Failed to subscribe to full blocks information from the node")?,
reporter.clone(),
);
let driver = Driver::new(
platform_information,
@@ -1,10 +1,12 @@
use std::{collections::HashSet, pin::Pin, sync::Arc};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use alloy::primitives::{BlockNumber, TxHash};
use anyhow::Result;
use futures::{Stream, StreamExt};
use revive_dt_common::types::PlatformIdentifier;
use revive_dt_format::steps::StepPath;
use revive_dt_node_interaction::MinedBlockInformation;
use revive_dt_report::Reporter;
use tokio::sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
@@ -25,12 +27,16 @@ pub struct Watcher {
/// This is a stream of the blocks that were mined by the node. This is for a single platform
/// and a single node from that platform.
blocks_stream: Pin<Box<dyn Stream<Item = MinedBlockInformation>>>,
/// The reporter used to send events to the report aggregator.
reporter: Reporter,
}
impl Watcher {
pub fn new(
platform_identifier: PlatformIdentifier,
blocks_stream: Pin<Box<dyn Stream<Item = MinedBlockInformation>>>,
reporter: Reporter,
) -> (Self, UnboundedSender<WatcherEvent>) {
let (tx, rx) = unbounded_channel::<WatcherEvent>();
(
@@ -38,6 +44,7 @@ impl Watcher {
platform_identifier,
rx,
blocks_stream,
reporter,
},
tx,
)
@@ -61,7 +68,8 @@ impl Watcher {
// This is the set of the transaction hashes that the watcher should be looking for and
// watch for them in the blocks. The watcher will keep watching for blocks until it sees
// that all of the transactions that it was watching for has been seen in the mined blocks.
let watch_for_transaction_hashes = Arc::new(RwLock::new(HashSet::<TxHash>::new()));
let watch_for_transaction_hashes =
Arc::new(RwLock::new(HashMap::<TxHash, StepPath>::new()));
// A boolean that keeps track of whether all of the transactions were submitted or if more
// txs are expected to come through the receive side of the channel. We do not want to rely
@@ -81,11 +89,14 @@ impl Watcher {
// contain nested repetitions and therefore there's no use in doing any
// action if the repetitions are nested.
WatcherEvent::RepetitionStartEvent { .. } => {}
WatcherEvent::SubmittedTransaction { transaction_hash } => {
WatcherEvent::SubmittedTransaction {
transaction_hash,
step_path,
} => {
watch_for_transaction_hashes
.write()
.await
.insert(transaction_hash);
.insert(transaction_hash, step_path);
}
WatcherEvent::AllTransactionsSubmitted => {
*all_transactions_submitted.write().await = true;
@@ -184,7 +195,7 @@ impl Watcher {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum WatcherEvent {
/// Informs the watcher that it should begin watching for the blocks mined by the platforms.
/// Before the watcher receives this event it will not be watching for the mined blocks. The
@@ -204,6 +215,8 @@ pub enum WatcherEvent {
SubmittedTransaction {
/// The hash of the submitted transaction.
transaction_hash: TxHash,
/// The step path of the step that the transaction belongs to.
step_path: StepPath,
},
/// Informs the watcher that all of the transactions of this benchmark have been submitted and