From fe7eaae984a0334014e7e9676f6b79dde4d50622 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Fri, 3 Oct 2025 01:26:19 +0300 Subject: [PATCH] Fix issues in the benchmarks driver --- .../src/differential_benchmarks/driver.rs | 15 +++-- .../src/differential_benchmarks/watcher.rs | 8 +-- crates/node/src/lighthouse_geth.rs | 62 ++++++++++--------- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/crates/core/src/differential_benchmarks/driver.rs b/crates/core/src/differential_benchmarks/driver.rs index b1086d1..abdc6fd 100644 --- a/crates/core/src/differential_benchmarks/driver.rs +++ b/crates/core/src/differential_benchmarks/driver.rs @@ -36,7 +36,7 @@ use revive_dt_format::{ traits::{ResolutionContext, ResolverApi}, }; use tokio::sync::{Mutex, mpsc::UnboundedSender}; -use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument, trace}; +use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; use crate::{ differential_benchmarks::{ExecutionState, WatcherEvent}, @@ -482,7 +482,8 @@ where execution_state: self.execution_state.clone(), steps_executed: 0, steps_iterator: { - step.steps + let steps = step + .steps .iter() .cloned() .enumerate() @@ -491,6 +492,8 @@ where let step_path = step_path.append(step_idx); (step_path, step) }) + .collect::>(); + steps.into_iter() }, watcher_tx: self.watcher_tx.clone(), }) @@ -740,19 +743,21 @@ where Span::current().record("transaction_hash", display(transaction_hash)); info!("Submitted transaction"); - self.watcher_tx .send(WatcherEvent::SubmittedTransaction { transaction_hash }) .context("Failed to send the transaction hash to the watcher")?; info!("Starting to poll for transaction receipt"); poll( - Duration::from_secs(10 * 60), + Duration::from_secs(30 * 60), PollingWaitBehavior::Constant(Duration::from_secs(1)), || { async move { match node.get_receipt(transaction_hash).await { - Ok(receipt) => Ok(ControlFlow::Break(receipt)), + Ok(receipt) => { + info!("Polling succeeded, receipt found"); + Ok(ControlFlow::Break(receipt)) + } Err(_) => Ok(ControlFlow::Continue(())), } } diff --git a/crates/core/src/differential_benchmarks/watcher.rs b/crates/core/src/differential_benchmarks/watcher.rs index 696a7b8..12ea840 100644 --- a/crates/core/src/differential_benchmarks/watcher.rs +++ b/crates/core/src/differential_benchmarks/watcher.rs @@ -150,19 +150,19 @@ impl Watcher { // very simple reporting for the time being. use std::io::Write; - let mut stdout = std::io::stdout().lock(); + let mut stderr = std::io::stderr().lock(); writeln!( - stdout, + stderr, "Watcher information for {}", self.platform_identifier )?; writeln!( - stdout, + stderr, "block_number,block_timestamp,mined_gas,block_gas_limit,tx_count" )?; for block in mined_blocks_information { writeln!( - stdout, + stderr, "{},{},{},{},{}", block.block_number, block.block_timestamp, diff --git a/crates/node/src/lighthouse_geth.rs b/crates/node/src/lighthouse_geth.rs index 4d9eeb9..7ff7b01 100644 --- a/crates/node/src/lighthouse_geth.rs +++ b/crates/node/src/lighthouse_geth.rs @@ -17,7 +17,7 @@ use std::{ pin::Pin, process::{Command, Stdio}, sync::{ - Arc, + Arc, LazyLock, atomic::{AtomicU32, Ordering}, }, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -47,13 +47,14 @@ use alloy::{ }, }, }, + transports::layers::RetryBackoffLayer, }; use anyhow::Context as _; use futures::{Stream, StreamExt}; use revive_common::EVMVersion; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; -use tokio::sync::OnceCell; +use tokio::sync::{OnceCell, Semaphore}; use tracing::{Instrument, info, instrument}; use revive_dt_common::{ @@ -133,6 +134,7 @@ pub struct LighthouseGethNode { RootProvider, >, >, + http_provider_requests_semaphore: LazyLock, } impl LighthouseGethNode { @@ -203,6 +205,7 @@ impl LighthouseGethNode { nonce_manager: Default::default(), persistent_http_provider: OnceCell::const_new(), persistent_ws_provider: OnceCell::const_new(), + http_provider_requests_semaphore: LazyLock::new(|| Semaphore::const_new(500)), } } @@ -239,10 +242,10 @@ impl LighthouseGethNode { "--cache=4096".to_string(), "--txlookuplimit=0".to_string(), "--gcmode=archive".to_string(), - "--txpool.globalslots=100000".to_string(), - "--txpool.globalqueue=100000".to_string(), - "--txpool.accountslots=128".to_string(), - "--txpool.accountqueue=1024".to_string(), + "--txpool.globalslots=500000".to_string(), + "--txpool.globalqueue=500000".to_string(), + "--txpool.accountslots=32768".to_string(), + "--txpool.accountqueue=32768".to_string(), "--http.api=admin,engine,net,eth,web3,debug,txpool".to_string(), "--http.addr=0.0.0.0".to_string(), "--ws".to_string(), @@ -250,9 +253,9 @@ impl LighthouseGethNode { "--ws.port=8546".to_string(), "--ws.api=eth,net,web3,txpool,engine".to_string(), "--ws.origins=*".to_string(), - "--verbosity=4".to_string(), ], consensus_layer_extra_parameters: vec![ + "--disable-quic".to_string(), "--disable-deposit-contract-sync".to_string(), ], }], @@ -284,7 +287,12 @@ impl LighthouseGethNode { public_port_start: Some(32000 + self.id as u16 * 1000), }, ), - consensus_layer_port_publisher_parameters: Default::default(), + consensus_layer_port_publisher_parameters: Some( + PortPublisherSingleItemParameters { + enabled: Some(true), + public_port_start: Some(59010 + self.id as u16 * 1000), + }, + ), }), }; @@ -405,12 +413,13 @@ impl LighthouseGethNode { .get_or_try_init(|| async move { info!("Initializing the WS provider of the lighthouse node"); let client = ClientBuilder::default() + .layer(RetryBackoffLayer::new(10, 1000, 100)) .connect_with(BuiltInConnectionString::Ws( self.ws_connection_string.as_str().parse().unwrap(), None, )) .await?; - Ok(self.provider(client)) + Ok(self.provider(client)).inspect(|_| info!("Initialized the WS provider")) }) .await .cloned() @@ -441,6 +450,7 @@ impl LighthouseGethNode { .get_or_try_init(|| async move { info!("Initializing the HTTP provider of the lighthouse node"); let client = ClientBuilder::default() + .layer(RetryBackoffLayer::new(10, 1000, 100)) .connect_with(BuiltInConnectionString::Http( self.http_connection_string.as_str().parse().unwrap(), )) @@ -483,11 +493,10 @@ impl LighthouseGethNode { err(Debug), )] async fn fund_all_accounts(&self) -> anyhow::Result<()> { - let provider = self + let mut full_block_subscriber = self .ws_provider() .await - .context("Failed to create the WS provider")?; - let mut full_block_subscriber = provider + .context("Failed to create the WS provider")? .subscribe_full_blocks() .into_stream() .await @@ -496,23 +505,14 @@ impl LighthouseGethNode { let mut tx_hashes = futures::future::try_join_all( NetworkWallet::::signer_addresses(self.wallet.as_ref()) .enumerate() - .map(|(nonce, address)| { - let provider = provider.clone(); - async move { - let mut transaction = TransactionRequest::default() - .from(self.prefunded_account_address) - .to(address) - .nonce(nonce as _) - .gas_limit(25_000_000) - .max_fee_per_gas(1_000_000_000) - .max_priority_fee_per_gas(1_000_000_000) - .value(INITIAL_BALANCE.try_into().unwrap()); - transaction.chain_id = Some(420420420); - provider - .send_transaction(transaction) - .await - .map(|tx| *tx.tx_hash()) - } + .map(|(nonce, address)| async move { + let mut transaction = TransactionRequest::default() + .from(self.prefunded_account_address) + .to(address) + .nonce(nonce as _) + .value(INITIAL_BALANCE.try_into().unwrap()); + transaction.chain_id = Some(420420420); + self.submit_transaction(transaction).await }), ) .await @@ -641,6 +641,8 @@ impl EthereumNode for LighthouseGethNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { + let _permit = self.http_provider_requests_semaphore.acquire().await; + let provider = self .http_provider() .await @@ -685,7 +687,7 @@ impl EthereumNode for LighthouseGethNode { ) -> Pin> + '_>> { Box::pin(async move { let provider = self - .ws_provider() + .http_provider() .await .context("Failed to create provider for transaction execution")?; Self::internal_execute_transaction(transaction, provider).await