From 088f67a6cd7f1482e080c852f3896b9226d68f20 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Thu, 4 Dec 2025 12:56:56 +0300 Subject: [PATCH] Improve the transaction submission logic --- .../src/differential_benchmarks/driver.rs | 71 ++++++++++--------- .../src/differential_benchmarks/watcher.rs | 17 +++-- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/crates/core/src/differential_benchmarks/driver.rs b/crates/core/src/differential_benchmarks/driver.rs index 078afd2..cc7ffad 100644 --- a/crates/core/src/differential_benchmarks/driver.rs +++ b/crates/core/src/differential_benchmarks/driver.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - ops::ControlFlow, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -13,6 +12,7 @@ use alloy::{ json_abi::JsonAbi, network::{Ethereum, TransactionBuilder}, primitives::{Address, TxHash, U256}, + providers::Provider, rpc::types::{ TransactionReceipt, TransactionRequest, trace::geth::{ @@ -22,12 +22,9 @@ use alloy::{ }, }; use anyhow::{Context as _, Result, bail}; -use futures::TryFutureExt; +use futures::{FutureExt as _, TryFutureExt}; use indexmap::IndexMap; -use revive_dt_common::{ - futures::{PollingWaitBehavior, poll}, - types::PrivateKeyAllocator, -}; +use revive_dt_common::types::PrivateKeyAllocator; use revive_dt_format::{ metadata::{ContractInstance, ContractPathAndIdent}, steps::{ @@ -37,7 +34,7 @@ use revive_dt_format::{ traits::{ResolutionContext, ResolverApi}, }; use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender}; -use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; +use tracing::{Span, debug, error, field::display, info, instrument}; use crate::{ differential_benchmarks::{ExecutionState, WatcherEvent}, @@ -373,12 +370,25 @@ where .await?; let (tx_hash, receipt_future) = self - .execute_transaction(tx, Some(step_path), Duration::from_secs(30 * 60)) + .execute_transaction(tx.clone(), Some(step_path), Duration::from_secs(30 * 60)) .await?; if self.await_transaction_inclusion { - let _ = receipt_future + let receipt = receipt_future .await .context("Failed while waiting for transaction inclusion in block")?; + + if !receipt.status() { + error!( + ?tx, + tx.hash = %receipt.transaction_hash, + ?receipt, + "Encountered a failing benchmark transaction" + ); + bail!( + "Encountered a failing transaction in benchmarks: {}", + receipt.transaction_hash + ) + } } Ok(tx_hash) @@ -694,7 +704,12 @@ where #[instrument( level = "info", skip_all, - fields(driver_id = self.driver_id, transaction_hash = tracing::field::Empty) + fields( + driver_id = self.driver_id, + transaction = ?transaction, + transaction_hash = tracing::field::Empty + ), + err(Debug) )] async fn execute_transaction( &self, @@ -703,10 +718,19 @@ where receipt_wait_duration: Duration, ) -> anyhow::Result<(TxHash, impl Future>)> { let node = self.platform_information.node; - let transaction_hash = node - .submit_transaction(transaction) + let provider = node.provider().await.context("Creating provider failed")?; + + let pending_transaction_builder = provider + .send_transaction(transaction) .await .context("Failed to submit transaction")?; + + let transaction_hash = *pending_transaction_builder.tx_hash(); + let receipt_future = pending_transaction_builder + .with_timeout(Some(receipt_wait_duration)) + .with_required_confirmations(2) + .get_receipt() + .map(|res| res.context("Failed to get the receipt of the transaction")); Span::current().record("transaction_hash", display(transaction_hash)); info!("Submitted transaction"); @@ -719,28 +743,7 @@ where .context("Failed to send the transaction hash to the watcher")?; }; - Ok((transaction_hash, async move { - info!("Starting to poll for transaction receipt"); - poll( - receipt_wait_duration, - PollingWaitBehavior::Constant(Duration::from_secs(1)), - || { - async move { - match node.get_receipt(transaction_hash).await { - Ok(receipt) => { - info!("Polling succeeded, receipt found"); - Ok(ControlFlow::Break(receipt)) - } - Err(_) => Ok(ControlFlow::Continue(())), - } - } - .instrument(info_span!("Polling for receipt")) - }, - ) - .instrument(info_span!("Polling for receipt", %transaction_hash)) - .await - .inspect(|_| info!("Found the transaction receipt")) - })) + Ok((transaction_hash, receipt_future)) } // endregion:Transaction Execution } diff --git a/crates/core/src/differential_benchmarks/watcher.rs b/crates/core/src/differential_benchmarks/watcher.rs index 2d96920..129067b 100644 --- a/crates/core/src/differential_benchmarks/watcher.rs +++ b/crates/core/src/differential_benchmarks/watcher.rs @@ -139,23 +139,18 @@ impl Watcher { break; } - info!( - block_number = block.ethereum_block_information.block_number, - block_tx_count = block.ethereum_block_information.transaction_hashes.len(), - remaining_transactions = watch_for_transaction_hashes.read().await.len(), - "Observed a block" - ); - // Remove all of the transaction hashes observed in this block from the txs we // are currently watching for. let mut watch_for_transaction_hashes = watch_for_transaction_hashes.write().await; + let mut relevant_transactions_observed = 0; for tx_hash in block.ethereum_block_information.transaction_hashes.iter() { let Some((step_path, submission_time)) = watch_for_transaction_hashes.remove(tx_hash) else { continue; }; + relevant_transactions_observed += 1; let transaction_information = TransactionInformation { transaction_hash: *tx_hash, submission_timestamp: submission_time @@ -172,6 +167,14 @@ impl Watcher { ) .expect("Can't fail") } + + info!( + block_number = block.ethereum_block_information.block_number, + block_tx_count = block.ethereum_block_information.transaction_hashes.len(), + relevant_transactions_observed, + remaining_transactions = watch_for_transaction_hashes.len(), + "Observed a block" + ); } info!("Watcher's Block Watching Task Finished");