Fix issues in the benchmarks driver

This commit is contained in:
Omar Abdulla
2025-10-03 01:26:19 +03:00
parent ff368b8444
commit fe7eaae984
3 changed files with 46 additions and 39 deletions
@@ -36,7 +36,7 @@ use revive_dt_format::{
traits::{ResolutionContext, ResolverApi}, traits::{ResolutionContext, ResolverApi},
}; };
use tokio::sync::{Mutex, mpsc::UnboundedSender}; use tokio::sync::{Mutex, mpsc::UnboundedSender};
use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument, trace}; use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument};
use crate::{ use crate::{
differential_benchmarks::{ExecutionState, WatcherEvent}, differential_benchmarks::{ExecutionState, WatcherEvent},
@@ -482,7 +482,8 @@ where
execution_state: self.execution_state.clone(), execution_state: self.execution_state.clone(),
steps_executed: 0, steps_executed: 0,
steps_iterator: { steps_iterator: {
step.steps let steps = step
.steps
.iter() .iter()
.cloned() .cloned()
.enumerate() .enumerate()
@@ -491,6 +492,8 @@ where
let step_path = step_path.append(step_idx); let step_path = step_path.append(step_idx);
(step_path, step) (step_path, step)
}) })
.collect::<Vec<_>>();
steps.into_iter()
}, },
watcher_tx: self.watcher_tx.clone(), watcher_tx: self.watcher_tx.clone(),
}) })
@@ -740,19 +743,21 @@ where
Span::current().record("transaction_hash", display(transaction_hash)); Span::current().record("transaction_hash", display(transaction_hash));
info!("Submitted transaction"); info!("Submitted transaction");
self.watcher_tx self.watcher_tx
.send(WatcherEvent::SubmittedTransaction { transaction_hash }) .send(WatcherEvent::SubmittedTransaction { transaction_hash })
.context("Failed to send the transaction hash to the watcher")?; .context("Failed to send the transaction hash to the watcher")?;
info!("Starting to poll for transaction receipt"); info!("Starting to poll for transaction receipt");
poll( poll(
Duration::from_secs(10 * 60), Duration::from_secs(30 * 60),
PollingWaitBehavior::Constant(Duration::from_secs(1)), PollingWaitBehavior::Constant(Duration::from_secs(1)),
|| { || {
async move { async move {
match node.get_receipt(transaction_hash).await { match node.get_receipt(transaction_hash).await {
Ok(receipt) => Ok(ControlFlow::Break(receipt)), Ok(receipt) => {
info!("Polling succeeded, receipt found");
Ok(ControlFlow::Break(receipt))
}
Err(_) => Ok(ControlFlow::Continue(())), Err(_) => Ok(ControlFlow::Continue(())),
} }
} }
@@ -150,19 +150,19 @@ impl Watcher {
// very simple reporting for the time being. // very simple reporting for the time being.
use std::io::Write; use std::io::Write;
let mut stdout = std::io::stdout().lock(); let mut stderr = std::io::stderr().lock();
writeln!( writeln!(
stdout, stderr,
"Watcher information for {}", "Watcher information for {}",
self.platform_identifier self.platform_identifier
)?; )?;
writeln!( writeln!(
stdout, stderr,
"block_number,block_timestamp,mined_gas,block_gas_limit,tx_count" "block_number,block_timestamp,mined_gas,block_gas_limit,tx_count"
)?; )?;
for block in mined_blocks_information { for block in mined_blocks_information {
writeln!( writeln!(
stdout, stderr,
"{},{},{},{},{}", "{},{},{},{},{}",
block.block_number, block.block_number,
block.block_timestamp, block.block_timestamp,
+32 -30
View File
@@ -17,7 +17,7 @@ use std::{
pin::Pin, pin::Pin,
process::{Command, Stdio}, process::{Command, Stdio},
sync::{ sync::{
Arc, Arc, LazyLock,
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
}, },
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
@@ -47,13 +47,14 @@ use alloy::{
}, },
}, },
}, },
transports::layers::RetryBackoffLayer,
}; };
use anyhow::Context as _; use anyhow::Context as _;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use revive_common::EVMVersion; use revive_common::EVMVersion;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as; use serde_with::serde_as;
use tokio::sync::OnceCell; use tokio::sync::{OnceCell, Semaphore};
use tracing::{Instrument, info, instrument}; use tracing::{Instrument, info, instrument};
use revive_dt_common::{ use revive_dt_common::{
@@ -133,6 +134,7 @@ pub struct LighthouseGethNode {
RootProvider, RootProvider,
>, >,
>, >,
http_provider_requests_semaphore: LazyLock<Semaphore>,
} }
impl LighthouseGethNode { impl LighthouseGethNode {
@@ -203,6 +205,7 @@ impl LighthouseGethNode {
nonce_manager: Default::default(), nonce_manager: Default::default(),
persistent_http_provider: OnceCell::const_new(), persistent_http_provider: OnceCell::const_new(),
persistent_ws_provider: OnceCell::const_new(), persistent_ws_provider: OnceCell::const_new(),
http_provider_requests_semaphore: LazyLock::new(|| Semaphore::const_new(500)),
} }
} }
@@ -239,10 +242,10 @@ impl LighthouseGethNode {
"--cache=4096".to_string(), "--cache=4096".to_string(),
"--txlookuplimit=0".to_string(), "--txlookuplimit=0".to_string(),
"--gcmode=archive".to_string(), "--gcmode=archive".to_string(),
"--txpool.globalslots=100000".to_string(), "--txpool.globalslots=500000".to_string(),
"--txpool.globalqueue=100000".to_string(), "--txpool.globalqueue=500000".to_string(),
"--txpool.accountslots=128".to_string(), "--txpool.accountslots=32768".to_string(),
"--txpool.accountqueue=1024".to_string(), "--txpool.accountqueue=32768".to_string(),
"--http.api=admin,engine,net,eth,web3,debug,txpool".to_string(), "--http.api=admin,engine,net,eth,web3,debug,txpool".to_string(),
"--http.addr=0.0.0.0".to_string(), "--http.addr=0.0.0.0".to_string(),
"--ws".to_string(), "--ws".to_string(),
@@ -250,9 +253,9 @@ impl LighthouseGethNode {
"--ws.port=8546".to_string(), "--ws.port=8546".to_string(),
"--ws.api=eth,net,web3,txpool,engine".to_string(), "--ws.api=eth,net,web3,txpool,engine".to_string(),
"--ws.origins=*".to_string(), "--ws.origins=*".to_string(),
"--verbosity=4".to_string(),
], ],
consensus_layer_extra_parameters: vec![ consensus_layer_extra_parameters: vec![
"--disable-quic".to_string(),
"--disable-deposit-contract-sync".to_string(), "--disable-deposit-contract-sync".to_string(),
], ],
}], }],
@@ -284,7 +287,12 @@ impl LighthouseGethNode {
public_port_start: Some(32000 + self.id as u16 * 1000), public_port_start: Some(32000 + self.id as u16 * 1000),
}, },
), ),
consensus_layer_port_publisher_parameters: Default::default(), consensus_layer_port_publisher_parameters: Some(
PortPublisherSingleItemParameters {
enabled: Some(true),
public_port_start: Some(59010 + self.id as u16 * 1000),
},
),
}), }),
}; };
@@ -405,12 +413,13 @@ impl LighthouseGethNode {
.get_or_try_init(|| async move { .get_or_try_init(|| async move {
info!("Initializing the WS provider of the lighthouse node"); info!("Initializing the WS provider of the lighthouse node");
let client = ClientBuilder::default() let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 1000, 100))
.connect_with(BuiltInConnectionString::Ws( .connect_with(BuiltInConnectionString::Ws(
self.ws_connection_string.as_str().parse().unwrap(), self.ws_connection_string.as_str().parse().unwrap(),
None, None,
)) ))
.await?; .await?;
Ok(self.provider(client)) Ok(self.provider(client)).inspect(|_| info!("Initialized the WS provider"))
}) })
.await .await
.cloned() .cloned()
@@ -441,6 +450,7 @@ impl LighthouseGethNode {
.get_or_try_init(|| async move { .get_or_try_init(|| async move {
info!("Initializing the HTTP provider of the lighthouse node"); info!("Initializing the HTTP provider of the lighthouse node");
let client = ClientBuilder::default() let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 1000, 100))
.connect_with(BuiltInConnectionString::Http( .connect_with(BuiltInConnectionString::Http(
self.http_connection_string.as_str().parse().unwrap(), self.http_connection_string.as_str().parse().unwrap(),
)) ))
@@ -483,11 +493,10 @@ impl LighthouseGethNode {
err(Debug), err(Debug),
)] )]
async fn fund_all_accounts(&self) -> anyhow::Result<()> { async fn fund_all_accounts(&self) -> anyhow::Result<()> {
let provider = self let mut full_block_subscriber = self
.ws_provider() .ws_provider()
.await .await
.context("Failed to create the WS provider")?; .context("Failed to create the WS provider")?
let mut full_block_subscriber = provider
.subscribe_full_blocks() .subscribe_full_blocks()
.into_stream() .into_stream()
.await .await
@@ -496,23 +505,14 @@ impl LighthouseGethNode {
let mut tx_hashes = futures::future::try_join_all( let mut tx_hashes = futures::future::try_join_all(
NetworkWallet::<Ethereum>::signer_addresses(self.wallet.as_ref()) NetworkWallet::<Ethereum>::signer_addresses(self.wallet.as_ref())
.enumerate() .enumerate()
.map(|(nonce, address)| { .map(|(nonce, address)| async move {
let provider = provider.clone(); let mut transaction = TransactionRequest::default()
async move { .from(self.prefunded_account_address)
let mut transaction = TransactionRequest::default() .to(address)
.from(self.prefunded_account_address) .nonce(nonce as _)
.to(address) .value(INITIAL_BALANCE.try_into().unwrap());
.nonce(nonce as _) transaction.chain_id = Some(420420420);
.gas_limit(25_000_000) self.submit_transaction(transaction).await
.max_fee_per_gas(1_000_000_000)
.max_priority_fee_per_gas(1_000_000_000)
.value(INITIAL_BALANCE.try_into().unwrap());
transaction.chain_id = Some(420420420);
provider
.send_transaction(transaction)
.await
.map(|tx| *tx.tx_hash())
}
}), }),
) )
.await .await
@@ -641,6 +641,8 @@ impl EthereumNode for LighthouseGethNode {
transaction: TransactionRequest, transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TxHash>> + '_>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<TxHash>> + '_>> {
Box::pin(async move { Box::pin(async move {
let _permit = self.http_provider_requests_semaphore.acquire().await;
let provider = self let provider = self
.http_provider() .http_provider()
.await .await
@@ -685,7 +687,7 @@ impl EthereumNode for LighthouseGethNode {
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
Box::pin(async move { Box::pin(async move {
let provider = self let provider = self
.ws_provider() .http_provider()
.await .await
.context("Failed to create provider for transaction execution")?; .context("Failed to create provider for transaction execution")?;
Self::internal_execute_transaction(transaction, provider).await Self::internal_execute_transaction(transaction, provider).await