Improve the transaction submission logic

This commit is contained in:
Omar Abdulla
2025-12-04 12:56:56 +03:00
parent c59a38c705
commit 088f67a6cd
2 changed files with 47 additions and 41 deletions
@@ -1,6 +1,5 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
ops::ControlFlow,
sync::{ sync::{
Arc, Arc,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@@ -13,6 +12,7 @@ use alloy::{
json_abi::JsonAbi, json_abi::JsonAbi,
network::{Ethereum, TransactionBuilder}, network::{Ethereum, TransactionBuilder},
primitives::{Address, TxHash, U256}, primitives::{Address, TxHash, U256},
providers::Provider,
rpc::types::{ rpc::types::{
TransactionReceipt, TransactionRequest, TransactionReceipt, TransactionRequest,
trace::geth::{ trace::geth::{
@@ -22,12 +22,9 @@ use alloy::{
}, },
}; };
use anyhow::{Context as _, Result, bail}; use anyhow::{Context as _, Result, bail};
use futures::TryFutureExt; use futures::{FutureExt as _, TryFutureExt};
use indexmap::IndexMap; use indexmap::IndexMap;
use revive_dt_common::{ use revive_dt_common::types::PrivateKeyAllocator;
futures::{PollingWaitBehavior, poll},
types::PrivateKeyAllocator,
};
use revive_dt_format::{ use revive_dt_format::{
metadata::{ContractInstance, ContractPathAndIdent}, metadata::{ContractInstance, ContractPathAndIdent},
steps::{ steps::{
@@ -37,7 +34,7 @@ use revive_dt_format::{
traits::{ResolutionContext, ResolverApi}, traits::{ResolutionContext, ResolverApi},
}; };
use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender}; use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender};
use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; use tracing::{Span, debug, error, field::display, info, instrument};
use crate::{ use crate::{
differential_benchmarks::{ExecutionState, WatcherEvent}, differential_benchmarks::{ExecutionState, WatcherEvent},
@@ -373,12 +370,25 @@ where
.await?; .await?;
let (tx_hash, receipt_future) = self let (tx_hash, receipt_future) = self
.execute_transaction(tx, Some(step_path), Duration::from_secs(30 * 60)) .execute_transaction(tx.clone(), Some(step_path), Duration::from_secs(30 * 60))
.await?; .await?;
if self.await_transaction_inclusion { if self.await_transaction_inclusion {
let _ = receipt_future let receipt = receipt_future
.await .await
.context("Failed while waiting for transaction inclusion in block")?; .context("Failed while waiting for transaction inclusion in block")?;
if !receipt.status() {
error!(
?tx,
tx.hash = %receipt.transaction_hash,
?receipt,
"Encountered a failing benchmark transaction"
);
bail!(
"Encountered a failing transaction in benchmarks: {}",
receipt.transaction_hash
)
}
} }
Ok(tx_hash) Ok(tx_hash)
@@ -694,7 +704,12 @@ where
#[instrument( #[instrument(
level = "info", level = "info",
skip_all, skip_all,
fields(driver_id = self.driver_id, transaction_hash = tracing::field::Empty) fields(
driver_id = self.driver_id,
transaction = ?transaction,
transaction_hash = tracing::field::Empty
),
err(Debug)
)] )]
async fn execute_transaction( async fn execute_transaction(
&self, &self,
@@ -703,10 +718,19 @@ where
receipt_wait_duration: Duration, receipt_wait_duration: Duration,
) -> anyhow::Result<(TxHash, impl Future<Output = Result<TransactionReceipt>>)> { ) -> anyhow::Result<(TxHash, impl Future<Output = Result<TransactionReceipt>>)> {
let node = self.platform_information.node; let node = self.platform_information.node;
let transaction_hash = node let provider = node.provider().await.context("Creating provider failed")?;
.submit_transaction(transaction)
let pending_transaction_builder = provider
.send_transaction(transaction)
.await .await
.context("Failed to submit transaction")?; .context("Failed to submit transaction")?;
let transaction_hash = *pending_transaction_builder.tx_hash();
let receipt_future = pending_transaction_builder
.with_timeout(Some(receipt_wait_duration))
.with_required_confirmations(2)
.get_receipt()
.map(|res| res.context("Failed to get the receipt of the transaction"));
Span::current().record("transaction_hash", display(transaction_hash)); Span::current().record("transaction_hash", display(transaction_hash));
info!("Submitted transaction"); info!("Submitted transaction");
@@ -719,28 +743,7 @@ where
.context("Failed to send the transaction hash to the watcher")?; .context("Failed to send the transaction hash to the watcher")?;
}; };
Ok((transaction_hash, async move { Ok((transaction_hash, receipt_future))
info!("Starting to poll for transaction receipt");
poll(
receipt_wait_duration,
PollingWaitBehavior::Constant(Duration::from_secs(1)),
|| {
async move {
match node.get_receipt(transaction_hash).await {
Ok(receipt) => {
info!("Polling succeeded, receipt found");
Ok(ControlFlow::Break(receipt))
}
Err(_) => Ok(ControlFlow::Continue(())),
}
}
.instrument(info_span!("Polling for receipt"))
},
)
.instrument(info_span!("Polling for receipt", %transaction_hash))
.await
.inspect(|_| info!("Found the transaction receipt"))
}))
} }
// endregion:Transaction Execution // endregion:Transaction Execution
} }
@@ -139,23 +139,18 @@ impl Watcher {
break; break;
} }
info!(
block_number = block.ethereum_block_information.block_number,
block_tx_count = block.ethereum_block_information.transaction_hashes.len(),
remaining_transactions = watch_for_transaction_hashes.read().await.len(),
"Observed a block"
);
// Remove all of the transaction hashes observed in this block from the txs we // Remove all of the transaction hashes observed in this block from the txs we
// are currently watching for. // are currently watching for.
let mut watch_for_transaction_hashes = let mut watch_for_transaction_hashes =
watch_for_transaction_hashes.write().await; watch_for_transaction_hashes.write().await;
let mut relevant_transactions_observed = 0;
for tx_hash in block.ethereum_block_information.transaction_hashes.iter() { for tx_hash in block.ethereum_block_information.transaction_hashes.iter() {
let Some((step_path, submission_time)) = let Some((step_path, submission_time)) =
watch_for_transaction_hashes.remove(tx_hash) watch_for_transaction_hashes.remove(tx_hash)
else { else {
continue; continue;
}; };
relevant_transactions_observed += 1;
let transaction_information = TransactionInformation { let transaction_information = TransactionInformation {
transaction_hash: *tx_hash, transaction_hash: *tx_hash,
submission_timestamp: submission_time submission_timestamp: submission_time
@@ -172,6 +167,14 @@ impl Watcher {
) )
.expect("Can't fail") .expect("Can't fail")
} }
info!(
block_number = block.ethereum_block_information.block_number,
block_tx_count = block.ethereum_block_information.transaction_hashes.len(),
relevant_transactions_observed,
remaining_transactions = watch_for_transaction_hashes.len(),
"Observed a block"
);
} }
info!("Watcher's Block Watching Task Finished"); info!("Watcher's Block Watching Task Finished");