Implement the core benchmarking components

This commit is contained in:
Omar Abdulla
2025-10-02 18:42:38 +03:00
parent 8762702560
commit ba90c60c1b
29 changed files with 1973 additions and 1168 deletions
+223 -60
View File
@@ -24,29 +24,36 @@ use std::{
};
use alloy::{
eips::{BlockId, BlockNumberOrTag},
eips::BlockNumberOrTag,
genesis::{Genesis, GenesisAccount},
network::{Ethereum, EthereumWallet, NetworkWallet},
primitives::{
Address, BlockHash, BlockNumber, BlockTimestamp, StorageKey, TxHash, U256, address,
},
providers::{
Provider, ProviderBuilder,
Identity, Provider, ProviderBuilder, RootProvider,
ext::DebugApi,
fillers::{CachedNonceManager, ChainIdFiller, FillProvider, NonceFiller, TxFiller},
fillers::{
CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller,
WalletFiller,
},
},
rpc::{
client::{BuiltInConnectionString, ClientBuilder, RpcClient},
types::{
EIP1186AccountProofResponse, TransactionRequest,
trace::geth::{DiffMode, GethDebugTracingOptions, PreStateConfig, PreStateFrame},
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
trace::geth::{
DiffMode, GethDebugTracingOptions, GethTrace, PreStateConfig, PreStateFrame,
},
},
},
};
use anyhow::Context as _;
use futures::{Stream, StreamExt};
use revive_common::EVMVersion;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tokio::sync::OnceCell;
use tracing::{Instrument, info, instrument};
use revive_dt_common::{
@@ -55,7 +62,7 @@ use revive_dt_common::{
};
use revive_dt_config::*;
use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode;
use revive_dt_node_interaction::{EthereumNode, MinedBlockInformation};
use crate::{
Node,
@@ -101,7 +108,31 @@ pub struct LighthouseGethNode {
/* Provider Related Fields */
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller,
persistent_http_provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
>,
persistent_ws_provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
>,
}
impl LighthouseGethNode {
@@ -170,7 +201,8 @@ impl LighthouseGethNode {
/* Provider Related Fields */
wallet: wallet.clone(),
nonce_manager: Default::default(),
chain_id_filler: Default::default(),
persistent_http_provider: OnceCell::const_new(),
persistent_ws_provider: OnceCell::const_new(),
}
}
@@ -205,6 +237,8 @@ impl LighthouseGethNode {
execution_layer_extra_parameters: vec![
"--nodiscover".to_string(),
"--cache=4096".to_string(),
"--txlookuplimit=0".to_string(),
"--gcmode=archive".to_string(),
"--txpool.globalslots=100000".to_string(),
"--txpool.globalqueue=100000".to_string(),
"--txpool.accountslots=128".to_string(),
@@ -216,6 +250,7 @@ impl LighthouseGethNode {
"--ws.port=8546".to_string(),
"--ws.api=eth,net,web3,txpool,engine".to_string(),
"--ws.origins=*".to_string(),
"--verbosity=4".to_string(),
],
consensus_layer_extra_parameters: vec![
"--disable-deposit-contract-sync".to_string(),
@@ -351,17 +386,34 @@ impl LighthouseGethNode {
fields(lighthouse_node_id = self.id, connection_string = self.ws_connection_string),
err(Debug),
)]
#[allow(clippy::type_complexity)]
async fn ws_provider(
&self,
) -> anyhow::Result<FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum>>
{
let client = ClientBuilder::default()
.connect_with(BuiltInConnectionString::Ws(
self.ws_connection_string.as_str().parse().unwrap(),
None,
))
.await?;
Ok(self.provider(client))
) -> anyhow::Result<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
> {
self.persistent_ws_provider
.get_or_try_init(|| async move {
info!("Initializing the WS provider of the lighthouse node");
let client = ClientBuilder::default()
.connect_with(BuiltInConnectionString::Ws(
self.ws_connection_string.as_str().parse().unwrap(),
None,
))
.await?;
Ok(self.provider(client))
})
.await
.cloned()
}
#[instrument(
@@ -370,22 +422,46 @@ impl LighthouseGethNode {
fields(lighthouse_node_id = self.id, connection_string = self.ws_connection_string),
err(Debug),
)]
#[allow(clippy::type_complexity)]
async fn http_provider(
&self,
) -> anyhow::Result<FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum>>
{
let client = ClientBuilder::default()
.connect_with(BuiltInConnectionString::Http(
self.http_connection_string.as_str().parse().unwrap(),
))
.await?;
Ok(self.provider(client))
) -> anyhow::Result<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
> {
self.persistent_http_provider
.get_or_try_init(|| async move {
info!("Initializing the HTTP provider of the lighthouse node");
let client = ClientBuilder::default()
.connect_with(BuiltInConnectionString::Http(
self.http_connection_string.as_str().parse().unwrap(),
))
.await?;
Ok(self.provider(client))
})
.await
.cloned()
}
#[allow(clippy::type_complexity)]
fn provider(
&self,
rpc_client: RpcClient,
) -> FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum> {
) -> FillProvider<
JoinFill<
JoinFill<JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>, NonceFiller>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
> {
ProviderBuilder::new()
.disable_recommended_fillers()
.filler(FallbackGasFiller::new(
@@ -393,7 +469,7 @@ impl LighthouseGethNode {
1_000_000_000,
1_000_000_000,
))
.filler(self.chain_id_filler.clone())
.filler(ChainIdFiller::new(Some(420420420)))
.filler(NonceFiller::new(self.nonce_manager.clone()))
.wallet(self.wallet.clone())
.connect_client(rpc_client)
@@ -407,60 +483,67 @@ impl LighthouseGethNode {
err(Debug),
)]
async fn fund_all_accounts(&self) -> anyhow::Result<()> {
let mut providers =
futures::future::try_join_all((0..100).map(|_| self.ws_provider()).collect::<Vec<_>>())
.await
.context("Failed to create the providers")?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let provider = self
.ws_provider()
.await
.context("Failed to create the WS provider")?;
let mut full_block_subscriber = provider
.subscribe_full_blocks()
.into_stream()
.await
.context("Full block subscriber")?;
let mut tx_hashes = futures::future::try_join_all(
NetworkWallet::<Ethereum>::signer_addresses(self.wallet.as_ref())
.enumerate()
.map(|(nonce, address)| {
let provider = providers[nonce % 100].clone();
let provider = provider.clone();
async move {
let transaction = TransactionRequest::default()
let mut transaction = TransactionRequest::default()
.from(self.prefunded_account_address)
.to(address)
.nonce(nonce as _)
.gas_limit(25_000_000)
.max_fee_per_gas(1_000_000_000)
.max_priority_fee_per_gas(1_000_000_000)
.value(INITIAL_BALANCE.try_into().unwrap());
transaction.chain_id = Some(420420420);
provider
.send_transaction(transaction)
.await
.map(|tx| *tx.tx_hash())
}
})
.collect::<Vec<_>>(),
}),
)
.await
.context("Failed to submit all transactions")?
.into_iter()
.collect::<HashSet<_>>();
let provider = providers.pop().unwrap();
let mut block_number = 0 as BlockNumber;
while !tx_hashes.is_empty() {
let Ok(Some(block)) = provider
.get_block(BlockId::Number(BlockNumberOrTag::Number(block_number)))
.await
else {
while let Some(block) = full_block_subscriber.next().await {
let Ok(block) = block else {
continue;
};
let block_number = block.number();
let block_timestamp = block.header.timestamp;
let block_transaction_count = block.transactions.len();
for hash in block.transactions.into_hashes().as_hashes().unwrap() {
tx_hashes.remove(hash);
}
info!(
block.number = block_number,
block.timestamp = block.header.timestamp,
block.timestamp = block_timestamp,
block.transaction_count = block_transaction_count,
remaining_transactions = tx_hashes.len(),
"Discovered new block in funding accounts"
"Discovered new block when funding accounts"
);
block_number += 1
if tx_hashes.is_empty() {
break;
}
}
Ok(())
@@ -468,11 +551,12 @@ impl LighthouseGethNode {
fn internal_execute_transaction<'a>(
transaction: TransactionRequest,
provider: Arc<
FillProvider<impl TxFiller<Ethereum> + 'a, impl Provider<Ethereum> + 'a, Ethereum>,
provider: FillProvider<
impl TxFiller<Ethereum> + 'a,
impl Provider<Ethereum> + Clone + 'a,
Ethereum,
>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::TransactionReceipt>> + 'a>>
{
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + 'a>> {
Box::pin(async move {
let pending_transaction = provider
.send_transaction(transaction)
@@ -552,17 +636,60 @@ impl EthereumNode for LighthouseGethNode {
fields(lighthouse_node_id = self.id, connection_string = self.ws_connection_string),
err,
)]
fn execute_transaction(
fn submit_transaction(
&self,
transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::TransactionReceipt>> + '_>>
{
) -> Pin<Box<dyn Future<Output = anyhow::Result<TxHash>> + '_>> {
Box::pin(async move {
let provider = self
.ws_provider()
.await
.map(Arc::new)
.context("Failed to create provider for transaction submission")?;
.context("Failed to create the provider for transaction submission")?;
tracing::trace!("Submit transaction, provider created");
let pending_transaction = provider
.send_transaction(transaction)
.await
.context("Failed to submit the transaction through the provider")?;
tracing::trace!("Submitted transaction");
Ok(*pending_transaction.tx_hash())
})
}
#[instrument(
level = "info",
skip_all,
fields(lighthouse_node_id = self.id, connection_string = self.ws_connection_string),
)]
fn get_receipt(
&self,
tx_hash: TxHash,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
Box::pin(async move {
self.ws_provider()
.await
.context("Failed to create provider for getting the receipt")?
.get_transaction_receipt(tx_hash)
.await
.context("Failed to get the receipt of the transaction")?
.context("Failed to get the receipt of the transaction")
})
}
#[instrument(
level = "info",
skip_all,
fields(lighthouse_node_id = self.id, connection_string = self.ws_connection_string),
err,
)]
fn execute_transaction(
&self,
transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
Box::pin(async move {
let provider = self
.ws_provider()
.await
.context("Failed to create provider for transaction execution")?;
Self::internal_execute_transaction(transaction, provider).await
})
}
@@ -572,8 +699,7 @@ impl EthereumNode for LighthouseGethNode {
&self,
tx_hash: TxHash,
trace_options: GethDebugTracingOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::trace::geth::GethTrace>> + '_>>
{
) -> Pin<Box<dyn Future<Output = anyhow::Result<GethTrace>> + '_>> {
Box::pin(async move {
let provider = Arc::new(
self.http_provider()
@@ -663,7 +789,7 @@ impl EthereumNode for LighthouseGethNode {
})
}
// #[instrument(level = "info", skip_all, fields(lighthouse_node_id = self.id))]
#[instrument(level = "info", skip_all, fields(lighthouse_node_id = self.id))]
fn resolver(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Arc<dyn ResolverApi + '_>>> + '_>> {
@@ -677,6 +803,43 @@ impl EthereumNode for LighthouseGethNode {
fn evm_version(&self) -> EVMVersion {
EVMVersion::Cancun
}
fn subscribe_to_full_blocks_information(
&self,
) -> Pin<
Box<
dyn Future<Output = anyhow::Result<Pin<Box<dyn Stream<Item = MinedBlockInformation>>>>>
+ '_,
>,
> {
Box::pin(async move {
let provider = self.ws_provider().await?;
let block_subscription = provider.subscribe_full_blocks().channel_size(1024);
let block_stream = block_subscription
.into_stream()
.await
.context("Failed to create the block stream")?;
let mined_block_information_stream = block_stream.filter_map(|block| async {
let block = block.ok()?;
Some(MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit as _,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
})
});
Ok(Box::pin(mined_block_information_stream)
as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>)
})
}
}
pub struct LighthouseGethNodeResolver<F: TxFiller<Ethereum>, P: Provider<Ethereum>> {