mirror of
https://github.com/pezkuwichain/revive-differential-tests.git
synced 2026-04-25 01:08:00 +00:00
Core Benchmarking Infra (#175)
* Implement a solution for the pre-fund account limit * Update the account pre-funding handling * Fix the lighthouse node tracing issue * refactor existing dt infra * Implement the platform driver * Wire up the cleaned up driver implementation * Implement the core benchmarking components * Remove some debug logging * Fix issues in the benchmarks driver * Implement a global concurrency limit on provider requests * Update the concurrency limit * Update the concurrency limit * Cleanups * Update the lighthouse ports * Ignore certain tests * Update the new geth test
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use alloy::transports::BoxFuture;
|
||||
use tokio::sync::Semaphore;
|
||||
use tower::{Layer, Service};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ConcurrencyLimiterLayer {
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl ConcurrencyLimiterLayer {
|
||||
pub fn new(permit_count: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(Semaphore::new(permit_count)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for ConcurrencyLimiterLayer {
|
||||
type Service = ConcurrencyLimiterService<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
ConcurrencyLimiterService {
|
||||
service: inner,
|
||||
semaphore: self.semaphore.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConcurrencyLimiterService<S> {
|
||||
service: S,
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl<S, Request> Service<Request> for ConcurrencyLimiterService<S>
|
||||
where
|
||||
S: Service<Request> + Send,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request) -> Self::Future {
|
||||
let semaphore = self.semaphore.clone();
|
||||
let future = self.service.call(req);
|
||||
|
||||
Box::pin(async move {
|
||||
let _permit = semaphore
|
||||
.acquire()
|
||||
.await
|
||||
.expect("Semaphore has been closed");
|
||||
tracing::debug!(
|
||||
available_permits = semaphore.available_permits(),
|
||||
"Acquired Semaphore Permit"
|
||||
);
|
||||
future.await
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
use alloy::{
|
||||
network::{Network, TransactionBuilder},
|
||||
providers::{
|
||||
Provider, SendableTx,
|
||||
fillers::{GasFiller, TxFiller},
|
||||
},
|
||||
transports::TransportResult,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FallbackGasFiller {
|
||||
inner: GasFiller,
|
||||
default_gas_limit: u64,
|
||||
default_max_fee_per_gas: u128,
|
||||
default_priority_fee: u128,
|
||||
}
|
||||
|
||||
impl FallbackGasFiller {
|
||||
pub fn new(
|
||||
default_gas_limit: u64,
|
||||
default_max_fee_per_gas: u128,
|
||||
default_priority_fee: u128,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: GasFiller,
|
||||
default_gas_limit,
|
||||
default_max_fee_per_gas,
|
||||
default_priority_fee,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FallbackGasFiller {
|
||||
fn default() -> Self {
|
||||
FallbackGasFiller::new(25_000_000, 1_000_000_000, 1_000_000_000)
|
||||
}
|
||||
}
|
||||
|
||||
impl<N> TxFiller<N> for FallbackGasFiller
|
||||
where
|
||||
N: Network,
|
||||
{
|
||||
type Fillable = Option<<GasFiller as TxFiller<N>>::Fillable>;
|
||||
|
||||
fn status(
|
||||
&self,
|
||||
tx: &<N as Network>::TransactionRequest,
|
||||
) -> alloy::providers::fillers::FillerControlFlow {
|
||||
<GasFiller as TxFiller<N>>::status(&self.inner, tx)
|
||||
}
|
||||
|
||||
fn fill_sync(&self, _: &mut alloy::providers::SendableTx<N>) {}
|
||||
|
||||
async fn prepare<P: Provider<N>>(
|
||||
&self,
|
||||
provider: &P,
|
||||
tx: &<N as Network>::TransactionRequest,
|
||||
) -> TransportResult<Self::Fillable> {
|
||||
// Try to fetch GasFiller’s “fillable” (gas_price, base_fee, estimate_gas, …)
|
||||
// If it errors (i.e. tx would revert under eth_estimateGas), swallow it.
|
||||
match self.inner.prepare(provider, tx).await {
|
||||
Ok(fill) => Ok(Some(fill)),
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fill(
|
||||
&self,
|
||||
fillable: Self::Fillable,
|
||||
mut tx: alloy::providers::SendableTx<N>,
|
||||
) -> TransportResult<SendableTx<N>> {
|
||||
if let Some(fill) = fillable {
|
||||
// our inner GasFiller succeeded — use it
|
||||
self.inner.fill(fill, tx).await
|
||||
} else {
|
||||
if let Some(builder) = tx.as_mut_builder() {
|
||||
builder.set_gas_limit(self.default_gas_limit);
|
||||
builder.set_max_fee_per_gas(self.default_max_fee_per_gas);
|
||||
builder.set_max_priority_fee_per_gas(self.default_priority_fee);
|
||||
}
|
||||
Ok(tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
mod concurrency_limiter;
|
||||
mod fallback_gas_provider;
|
||||
mod provider;
|
||||
|
||||
pub use concurrency_limiter::*;
|
||||
pub use fallback_gas_provider::*;
|
||||
pub use provider::*;
|
||||
@@ -0,0 +1,63 @@
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use alloy::{
|
||||
network::{Network, NetworkWallet, TransactionBuilder4844},
|
||||
providers::{
|
||||
Identity, ProviderBuilder, RootProvider,
|
||||
fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller},
|
||||
},
|
||||
rpc::client::ClientBuilder,
|
||||
};
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller};
|
||||
|
||||
pub type ConcreteProvider<N, W> = FillProvider<
|
||||
JoinFill<
|
||||
JoinFill<JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>, NonceFiller>,
|
||||
WalletFiller<W>,
|
||||
>,
|
||||
RootProvider<N>,
|
||||
N,
|
||||
>;
|
||||
|
||||
pub async fn construct_concurrency_limited_provider<N, W>(
|
||||
rpc_url: &str,
|
||||
fallback_gas_filler: FallbackGasFiller,
|
||||
chain_id_filler: ChainIdFiller,
|
||||
nonce_filler: NonceFiller,
|
||||
wallet: W,
|
||||
) -> Result<ConcreteProvider<N, W>>
|
||||
where
|
||||
N: Network<TransactionRequest: TransactionBuilder4844>,
|
||||
W: NetworkWallet<N>,
|
||||
Identity: TxFiller<N>,
|
||||
FallbackGasFiller: TxFiller<N>,
|
||||
ChainIdFiller: TxFiller<N>,
|
||||
NonceFiller: TxFiller<N>,
|
||||
WalletFiller<W>: TxFiller<N>,
|
||||
{
|
||||
// This is a global limit on the RPC concurrency that applies to all of the providers created
|
||||
// by the framework. With this limit, it means that we can have a maximum of N concurrent
|
||||
// requests at any point of time and no more than that. This is done in an effort to stabilize
|
||||
// the framework from some of the interment issues that we've been seeing related to RPC calls.
|
||||
static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock<ConcurrencyLimiterLayer> =
|
||||
LazyLock::new(|| ConcurrencyLimiterLayer::new(10));
|
||||
|
||||
let client = ClientBuilder::default()
|
||||
.layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone())
|
||||
.connect(rpc_url)
|
||||
.await
|
||||
.context("Failed to construct the RPC client")?;
|
||||
|
||||
let provider = ProviderBuilder::new()
|
||||
.disable_recommended_fillers()
|
||||
.network::<N>()
|
||||
.filler(fallback_gas_filler)
|
||||
.filler(chain_id_filler)
|
||||
.filler(nonce_filler)
|
||||
.wallet(wallet)
|
||||
.connect_client(client);
|
||||
|
||||
Ok(provider)
|
||||
}
|
||||
Reference in New Issue
Block a user