Implement a global concurrency limit on provider requests

This commit is contained in:
Omar Abdulla
2025-10-04 15:47:56 +03:00
parent fe7eaae984
commit 2bb9594857
17 changed files with 241 additions and 210 deletions
Generated
+3
View File
@@ -4672,6 +4672,7 @@ dependencies = [
"sp-runtime",
"temp-dir",
"tokio",
"tower",
"tracing",
]
@@ -6270,8 +6271,10 @@ dependencies = [
"pin-project-lite",
"sync_wrapper",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
+1
View File
@@ -57,6 +57,7 @@ tokio = { version = "1.47.0", default-features = false, features = [
"process",
"rt",
] }
tower = { version = "0.5.2", features = ["limit"] }
uuid = { version = "1.8", features = ["v4"] }
tracing = { version = "0.1.41" }
tracing-appender = { version = "0.2.3" }
+3 -1
View File
@@ -14,7 +14,9 @@ use revive_dt_common::types::*;
use revive_dt_compiler::{SolidityCompiler, revive_resolc::Resolc, solc::Solc};
use revive_dt_config::*;
use revive_dt_node::{
Node, geth::GethNode, lighthouse_geth::LighthouseGethNode, substrate::SubstrateNode,
Node, node_implementations::geth::GethNode,
node_implementations::lighthouse_geth::LighthouseGethNode,
node_implementations::substrate::SubstrateNode,
};
use revive_dt_node_interaction::EthereumNode;
use tracing::info;
+1
View File
@@ -13,6 +13,7 @@ anyhow = { workspace = true }
alloy = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tower = { workspace = true }
tokio = { workspace = true }
revive-common = { workspace = true }
+5
View File
@@ -1,5 +1,10 @@
use alloy::primitives::ChainId;
/// This constant defines how much Wei accounts are pre-seeded with in genesis.
///
/// Note: After changing this number, check that the tests for substrate work as we encountered
/// some issues with different values of the initial balance on substrate.
pub const INITIAL_BALANCE: u128 = 10u128.pow(37);
/// The chain id used for all of the chains spawned by the framework.
pub const CHAIN_ID: ChainId = 420420420;
+3
View File
@@ -0,0 +1,3 @@
mod process;
pub use process::*;
+3 -5
View File
@@ -3,12 +3,10 @@
use alloy::genesis::Genesis;
use revive_dt_node_interaction::EthereumNode;
pub mod common;
pub mod constants;
pub mod geth;
pub mod lighthouse_geth;
pub mod process;
pub mod substrate;
pub mod helpers;
pub mod node_implementations;
pub mod provider_utils;
/// An abstract interface for testing nodes.
pub trait Node: EthereumNode {
@@ -20,12 +20,9 @@ use alloy::{
network::{Ethereum, EthereumWallet, NetworkWallet},
primitives::{Address, BlockHash, BlockNumber, BlockTimestamp, StorageKey, TxHash, U256},
providers::{
Identity, Provider, ProviderBuilder, RootProvider,
Provider,
ext::DebugApi,
fillers::{
CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller,
WalletFiller,
},
fillers::{CachedNonceManager, ChainIdFiller, NonceFiller},
},
rpc::types::{
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
@@ -50,9 +47,9 @@ use revive_dt_node_interaction::{EthereumNode, MinedBlockInformation};
use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
constants::{CHAIN_ID, INITIAL_BALANCE},
helpers::{Process, ProcessReadinessWaitBehavior},
provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -77,19 +74,7 @@ pub struct GethNode {
start_timeout: Duration,
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller,
provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
>,
provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>,
}
impl GethNode {
@@ -138,7 +123,6 @@ impl GethNode {
handle: None,
start_timeout: geth_configuration.start_timeout_ms,
wallet: wallet.clone(),
chain_id_filler: Default::default(),
nonce_manager: Default::default(),
provider: Default::default(),
}
@@ -265,26 +249,18 @@ impl GethNode {
Ok(self)
}
async fn provider(
&self,
) -> anyhow::Result<
FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum> + Clone, Ethereum>,
> {
async fn provider(&self) -> anyhow::Result<ConcreteProvider<Ethereum, Arc<EthereumWallet>>> {
self.provider
.get_or_try_init(|| async move {
ProviderBuilder::new()
.disable_recommended_fillers()
.filler(FallbackGasFiller::new(
25_000_000,
1_000_000_000,
1_000_000_000,
))
.filler(self.chain_id_filler.clone())
.filler(NonceFiller::new(self.nonce_manager.clone()))
.wallet(self.wallet.clone())
.connect(&self.connection_string)
.await
.map_err(Into::into)
construct_concurrency_limited_provider::<Ethereum, _>(
self.connection_string.as_str(),
FallbackGasFiller::default(),
ChainIdFiller::new(Some(CHAIN_ID)),
NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(),
)
.await
.context("Failed to construct the provider")
})
.await
.cloned()
@@ -568,12 +544,12 @@ impl EthereumNode for GethNode {
}
}
pub struct GethNodeResolver<F: TxFiller<Ethereum>, P: Provider<Ethereum>> {
pub struct GethNodeResolver {
id: u32,
provider: FillProvider<F, P, Ethereum>,
provider: ConcreteProvider<Ethereum, Arc<EthereumWallet>>,
}
impl<F: TxFiller<Ethereum>, P: Provider<Ethereum>> ResolverApi for GethNodeResolver<F, P> {
impl ResolverApi for GethNodeResolver {
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn chain_id(
&self,
@@ -31,23 +31,16 @@ use alloy::{
Address, BlockHash, BlockNumber, BlockTimestamp, StorageKey, TxHash, U256, address,
},
providers::{
Identity, Provider, ProviderBuilder, RootProvider,
Provider,
ext::DebugApi,
fillers::{
CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller,
WalletFiller,
fillers::{CachedNonceManager, ChainIdFiller, FillProvider, NonceFiller, TxFiller},
},
rpc::types::{
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
trace::geth::{
DiffMode, GethDebugTracingOptions, GethTrace, PreStateConfig, PreStateFrame,
},
},
rpc::{
client::{BuiltInConnectionString, ClientBuilder, RpcClient},
types::{
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
trace::geth::{
DiffMode, GethDebugTracingOptions, GethTrace, PreStateConfig, PreStateFrame,
},
},
},
transports::layers::RetryBackoffLayer,
};
use anyhow::Context as _;
use futures::{Stream, StreamExt};
@@ -67,9 +60,9 @@ use revive_dt_node_interaction::{EthereumNode, MinedBlockInformation};
use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
constants::{CHAIN_ID, INITIAL_BALANCE},
helpers::{Process, ProcessReadinessWaitBehavior},
provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -110,30 +103,8 @@ pub struct LighthouseGethNode {
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
persistent_http_provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
>,
persistent_ws_provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
>,
persistent_http_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>,
persistent_ws_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>,
http_provider_requests_semaphore: LazyLock<Semaphore>,
}
@@ -262,7 +233,7 @@ impl LighthouseGethNode {
network_parameters: NetworkParameters {
preset: NetworkPreset::Mainnet,
seconds_per_slot: 12,
network_id: 420420420,
network_id: CHAIN_ID,
deposit_contract_address: address!("0x00000000219ab540356cBB839Cbe05303d7705Fa"),
altair_fork_epoch: 0,
bellatrix_fork_epoch: 0,
@@ -395,31 +366,18 @@ impl LighthouseGethNode {
err(Debug),
)]
#[allow(clippy::type_complexity)]
async fn ws_provider(
&self,
) -> anyhow::Result<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
> {
async fn ws_provider(&self) -> anyhow::Result<ConcreteProvider<Ethereum, Arc<EthereumWallet>>> {
self.persistent_ws_provider
.get_or_try_init(|| async move {
info!("Initializing the WS provider of the lighthouse node");
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 1000, 100))
.connect_with(BuiltInConnectionString::Ws(
self.ws_connection_string.as_str().parse().unwrap(),
None,
))
.await?;
Ok(self.provider(client)).inspect(|_| info!("Initialized the WS provider"))
construct_concurrency_limited_provider::<Ethereum, _>(
self.ws_connection_string.as_str(),
FallbackGasFiller::default(),
ChainIdFiller::new(Some(CHAIN_ID)),
NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(),
)
.await
.context("Failed to construct the provider")
})
.await
.cloned()
@@ -434,57 +392,23 @@ impl LighthouseGethNode {
#[allow(clippy::type_complexity)]
async fn http_provider(
&self,
) -> anyhow::Result<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
>,
> {
) -> anyhow::Result<ConcreteProvider<Ethereum, Arc<EthereumWallet>>> {
self.persistent_http_provider
.get_or_try_init(|| async move {
info!("Initializing the HTTP provider of the lighthouse node");
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 1000, 100))
.connect_with(BuiltInConnectionString::Http(
self.http_connection_string.as_str().parse().unwrap(),
))
.await?;
Ok(self.provider(client))
construct_concurrency_limited_provider::<Ethereum, _>(
self.http_connection_string.as_str(),
FallbackGasFiller::default(),
ChainIdFiller::new(Some(CHAIN_ID)),
NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(),
)
.await
.context("Failed to construct the provider")
})
.await
.cloned()
}
#[allow(clippy::type_complexity)]
fn provider(
&self,
rpc_client: RpcClient,
) -> FillProvider<
JoinFill<
JoinFill<JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>, NonceFiller>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider,
> {
ProviderBuilder::new()
.disable_recommended_fillers()
.filler(FallbackGasFiller::new(
25_000_000,
1_000_000_000,
1_000_000_000,
))
.filler(ChainIdFiller::new(Some(420420420)))
.filler(NonceFiller::new(self.nonce_manager.clone()))
.wallet(self.wallet.clone())
.connect_client(rpc_client)
}
/// Funds all of the accounts in the Ethereum wallet from the initially funded account.
#[instrument(
level = "info",
@@ -511,7 +435,7 @@ impl LighthouseGethNode {
.to(address)
.nonce(nonce as _)
.value(INITIAL_BALANCE.try_into().unwrap());
transaction.chain_id = Some(420420420);
transaction.chain_id = Some(CHAIN_ID);
self.submit_transaction(transaction).await
}),
)
@@ -0,0 +1,3 @@
pub mod geth;
pub mod lighthouse_geth;
pub mod substrate;
@@ -23,12 +23,9 @@ use alloy::{
TxHash, U256,
},
providers::{
Identity, Provider, ProviderBuilder, RootProvider,
Provider,
ext::DebugApi,
fillers::{
CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller,
WalletFiller,
},
fillers::{CachedNonceManager, ChainIdFiller, NonceFiller},
},
rpc::types::{
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
@@ -55,9 +52,9 @@ use tracing::instrument;
use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
constants::{CHAIN_ID, INITIAL_BALANCE},
helpers::{Process, ProcessReadinessWaitBehavior},
provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -79,21 +76,7 @@ pub struct SubstrateNode {
eth_proxy_process: Option<Process>,
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller,
#[allow(clippy::type_complexity)]
provider: OnceCell<
FillProvider<
JoinFill<
JoinFill<
JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>,
NonceFiller,
>,
WalletFiller<Arc<EthereumWallet>>,
>,
RootProvider<ReviveNetwork>,
ReviveNetwork,
>,
>,
provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>,
}
impl SubstrateNode {
@@ -143,7 +126,6 @@ impl SubstrateNode {
substrate_process: None,
eth_proxy_process: None,
wallet: wallet.clone(),
chain_id_filler: Default::default(),
nonce_manager: Default::default(),
provider: Default::default(),
}
@@ -358,29 +340,18 @@ impl SubstrateNode {
async fn provider(
&self,
) -> anyhow::Result<
FillProvider<
impl TxFiller<ReviveNetwork>,
impl Provider<ReviveNetwork> + Clone,
ReviveNetwork,
>,
> {
) -> anyhow::Result<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>> {
self.provider
.get_or_try_init(|| async move {
ProviderBuilder::new()
.disable_recommended_fillers()
.network::<ReviveNetwork>()
.filler(FallbackGasFiller::new(
25_000_000,
1_000_000_000,
1_000_000_000,
))
.filler(self.chain_id_filler.clone())
.filler(NonceFiller::new(self.nonce_manager.clone()))
.wallet(self.wallet.clone())
.connect(&self.rpc_url)
.await
.map_err(Into::into)
construct_concurrency_limited_provider::<ReviveNetwork, _>(
self.rpc_url.as_str(),
FallbackGasFiller::default(),
ChainIdFiller::new(Some(CHAIN_ID)),
NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(),
)
.await
.context("Failed to construct the provider")
})
.await
.cloned()
@@ -436,7 +407,12 @@ impl EthereumNode for SubstrateNode {
&self,
transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
static SEMAPHORE: std::sync::LazyLock<tokio::sync::Semaphore> =
std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(500));
Box::pin(async move {
let _permit = SEMAPHORE.acquire().await?;
let receipt = self
.provider()
.await
@@ -572,14 +548,12 @@ impl EthereumNode for SubstrateNode {
}
}
pub struct SubstrateNodeResolver<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> {
pub struct SubstrateNodeResolver {
id: u32,
provider: FillProvider<F, P, ReviveNetwork>,
provider: ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>,
}
impl<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> ResolverApi
for SubstrateNodeResolver<F, P>
{
impl ResolverApi for SubstrateNodeResolver {
#[instrument(level = "info", skip_all, fields(substrate_node_id = self.id))]
fn chain_id(
&self,
@@ -0,0 +1,65 @@
use std::sync::Arc;
use alloy::transports::BoxFuture;
use tokio::sync::Semaphore;
use tower::{Layer, Service};
#[derive(Clone, Debug)]
pub struct ConcurrencyLimiterLayer {
semaphore: Arc<Semaphore>,
}
impl ConcurrencyLimiterLayer {
pub fn new(permit_count: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(permit_count)),
}
}
}
impl<S> Layer<S> for ConcurrencyLimiterLayer {
type Service = ConcurrencyLimiterService<S>;
fn layer(&self, inner: S) -> Self::Service {
ConcurrencyLimiterService {
service: inner,
semaphore: self.semaphore.clone(),
}
}
}
#[derive(Clone)]
pub struct ConcurrencyLimiterService<S> {
service: S,
semaphore: Arc<Semaphore>,
}
impl<S, Request> Service<Request> for ConcurrencyLimiterService<S>
where
S: Service<Request> + Send,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
let semaphore = self.semaphore.clone();
let future = self.service.call(req);
Box::pin(async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore has been closed");
future.await
})
}
}
@@ -30,6 +30,12 @@ impl FallbackGasFiller {
}
}
impl Default for FallbackGasFiller {
fn default() -> Self {
FallbackGasFiller::new(25_000_000, 1_000_000_000, 1_000_000_000)
}
}
impl<N> TxFiller<N> for FallbackGasFiller
where
N: Network,
+7
View File
@@ -0,0 +1,7 @@
mod concurrency_limiter;
mod fallback_gas_provider;
mod provider;
pub use concurrency_limiter::*;
pub use fallback_gas_provider::*;
pub use provider::*;
@@ -0,0 +1,63 @@
use std::sync::LazyLock;
use alloy::{
network::{Network, NetworkWallet, TransactionBuilder4844},
providers::{
Identity, ProviderBuilder, RootProvider,
fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller},
},
rpc::client::ClientBuilder,
};
use anyhow::{Context, Result};
use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller};
pub type ConcreteProvider<N, W> = FillProvider<
JoinFill<
JoinFill<JoinFill<JoinFill<Identity, FallbackGasFiller>, ChainIdFiller>, NonceFiller>,
WalletFiller<W>,
>,
RootProvider<N>,
N,
>;
pub async fn construct_concurrency_limited_provider<N, W>(
rpc_url: &str,
fallback_gas_filler: FallbackGasFiller,
chain_id_filler: ChainIdFiller,
nonce_filler: NonceFiller,
wallet: W,
) -> Result<ConcreteProvider<N, W>>
where
N: Network<TransactionRequest: TransactionBuilder4844>,
W: NetworkWallet<N>,
Identity: TxFiller<N>,
FallbackGasFiller: TxFiller<N>,
ChainIdFiller: TxFiller<N>,
NonceFiller: TxFiller<N>,
WalletFiller<W>: TxFiller<N>,
{
// This is a global limit on the RPC concurrency that applies to all of the providers created
// by the framework. With this limit, it means that we can have a maximum of 1500 concurrent
// requests at any point of time and no more than that. This is done in an effort to stabilize
// the framework from some of the interment issues that we've been seeing related to RPC calls.
static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock<ConcurrencyLimiterLayer> =
LazyLock::new(|| ConcurrencyLimiterLayer::new(1500));
let client = ClientBuilder::default()
.layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone())
.connect(rpc_url)
.await
.context("Failed to construct the RPC client")?;
let provider = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<N>()
.filler(fallback_gas_filler)
.filler(chain_id_filler)
.filler(nonce_filler)
.wallet(wallet)
.connect_client(client);
Ok(provider)
}
+1 -1
View File
@@ -90,7 +90,7 @@ echo ""
# Run the tool
RUST_LOG="info,alloy_pubsub::service=error" cargo run --release -- test \
--platform revive-dev-node-polkavm-resolc \
--platform revive-dev-node-revm-solc \
--corpus "$CORPUS_FILE" \
--working-directory "$WORKDIR" \
--concurrency.number-of-nodes 5 \