diff --git a/Cargo.lock b/Cargo.lock index d2ae2af..aa09b1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4672,6 +4672,7 @@ dependencies = [ "sp-runtime", "temp-dir", "tokio", + "tower", "tracing", ] @@ -6270,8 +6271,10 @@ dependencies = [ "pin-project-lite", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e929337..b18c827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ tokio = { version = "1.47.0", default-features = false, features = [ "process", "rt", ] } +tower = { version = "0.5.2", features = ["limit"] } uuid = { version = "1.8", features = ["v4"] } tracing = { version = "0.1.41" } tracing-appender = { version = "0.2.3" } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b50e619..5550a80 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -14,7 +14,9 @@ use revive_dt_common::types::*; use revive_dt_compiler::{SolidityCompiler, revive_resolc::Resolc, solc::Solc}; use revive_dt_config::*; use revive_dt_node::{ - Node, geth::GethNode, lighthouse_geth::LighthouseGethNode, substrate::SubstrateNode, + Node, node_implementations::geth::GethNode, + node_implementations::lighthouse_geth::LighthouseGethNode, + node_implementations::substrate::SubstrateNode, }; use revive_dt_node_interaction::EthereumNode; use tracing::info; diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9703794..8073c11 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -13,6 +13,7 @@ anyhow = { workspace = true } alloy = { workspace = true } futures = { workspace = true } tracing = { workspace = true } +tower = { workspace = true } tokio = { workspace = true } revive-common = { workspace = true } diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 70cbf66..30fb40c 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -1,5 +1,10 @@ +use alloy::primitives::ChainId; + /// This constant defines how much Wei accounts are pre-seeded with in genesis. /// /// Note: After changing this number, check that the tests for substrate work as we encountered /// some issues with different values of the initial balance on substrate. pub const INITIAL_BALANCE: u128 = 10u128.pow(37); + +/// The chain id used for all of the chains spawned by the framework. +pub const CHAIN_ID: ChainId = 420420420; diff --git a/crates/node/src/helpers/mod.rs b/crates/node/src/helpers/mod.rs new file mode 100644 index 0000000..8f4ed89 --- /dev/null +++ b/crates/node/src/helpers/mod.rs @@ -0,0 +1,3 @@ +mod process; + +pub use process::*; diff --git a/crates/node/src/process.rs b/crates/node/src/helpers/process.rs similarity index 100% rename from crates/node/src/process.rs rename to crates/node/src/helpers/process.rs diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 5cbedf8..8607dcc 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -3,12 +3,10 @@ use alloy::genesis::Genesis; use revive_dt_node_interaction::EthereumNode; -pub mod common; pub mod constants; -pub mod geth; -pub mod lighthouse_geth; -pub mod process; -pub mod substrate; +pub mod helpers; +pub mod node_implementations; +pub mod provider_utils; /// An abstract interface for testing nodes. pub trait Node: EthereumNode { diff --git a/crates/node/src/geth.rs b/crates/node/src/node_implementations/geth.rs similarity index 94% rename from crates/node/src/geth.rs rename to crates/node/src/node_implementations/geth.rs index ae0d0ce..bea3c0b 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/node_implementations/geth.rs @@ -20,12 +20,9 @@ use alloy::{ network::{Ethereum, EthereumWallet, NetworkWallet}, primitives::{Address, BlockHash, BlockNumber, BlockTimestamp, StorageKey, TxHash, U256}, providers::{ - Identity, Provider, ProviderBuilder, RootProvider, + Provider, ext::DebugApi, - fillers::{ - CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, - WalletFiller, - }, + fillers::{CachedNonceManager, ChainIdFiller, NonceFiller}, }, rpc::types::{ EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest, @@ -50,9 +47,9 @@ use revive_dt_node_interaction::{EthereumNode, MinedBlockInformation}; use crate::{ Node, - common::FallbackGasFiller, - constants::INITIAL_BALANCE, - process::{Process, ProcessReadinessWaitBehavior}, + constants::{CHAIN_ID, INITIAL_BALANCE}, + helpers::{Process, ProcessReadinessWaitBehavior}, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -77,19 +74,7 @@ pub struct GethNode { start_timeout: Duration, wallet: Arc, nonce_manager: CachedNonceManager, - chain_id_filler: ChainIdFiller, - provider: OnceCell< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - >, - >, + provider: OnceCell>>, } impl GethNode { @@ -138,7 +123,6 @@ impl GethNode { handle: None, start_timeout: geth_configuration.start_timeout_ms, wallet: wallet.clone(), - chain_id_filler: Default::default(), nonce_manager: Default::default(), provider: Default::default(), } @@ -265,26 +249,18 @@ impl GethNode { Ok(self) } - async fn provider( - &self, - ) -> anyhow::Result< - FillProvider, impl Provider + Clone, Ethereum>, - > { + async fn provider(&self) -> anyhow::Result>> { self.provider .get_or_try_init(|| async move { - ProviderBuilder::new() - .disable_recommended_fillers() - .filler(FallbackGasFiller::new( - 25_000_000, - 1_000_000_000, - 1_000_000_000, - )) - .filler(self.chain_id_filler.clone()) - .filler(NonceFiller::new(self.nonce_manager.clone())) - .wallet(self.wallet.clone()) - .connect(&self.connection_string) - .await - .map_err(Into::into) + construct_concurrency_limited_provider::( + self.connection_string.as_str(), + FallbackGasFiller::default(), + ChainIdFiller::new(Some(CHAIN_ID)), + NonceFiller::new(self.nonce_manager.clone()), + self.wallet.clone(), + ) + .await + .context("Failed to construct the provider") }) .await .cloned() @@ -568,12 +544,12 @@ impl EthereumNode for GethNode { } } -pub struct GethNodeResolver, P: Provider> { +pub struct GethNodeResolver { id: u32, - provider: FillProvider, + provider: ConcreteProvider>, } -impl, P: Provider> ResolverApi for GethNodeResolver { +impl ResolverApi for GethNodeResolver { #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] fn chain_id( &self, diff --git a/crates/node/src/lighthouse_geth.rs b/crates/node/src/node_implementations/lighthouse_geth.rs similarity index 91% rename from crates/node/src/lighthouse_geth.rs rename to crates/node/src/node_implementations/lighthouse_geth.rs index 7ff7b01..725a8a6 100644 --- a/crates/node/src/lighthouse_geth.rs +++ b/crates/node/src/node_implementations/lighthouse_geth.rs @@ -31,23 +31,16 @@ use alloy::{ Address, BlockHash, BlockNumber, BlockTimestamp, StorageKey, TxHash, U256, address, }, providers::{ - Identity, Provider, ProviderBuilder, RootProvider, + Provider, ext::DebugApi, - fillers::{ - CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, - WalletFiller, + fillers::{CachedNonceManager, ChainIdFiller, FillProvider, NonceFiller, TxFiller}, + }, + rpc::types::{ + EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest, + trace::geth::{ + DiffMode, GethDebugTracingOptions, GethTrace, PreStateConfig, PreStateFrame, }, }, - rpc::{ - client::{BuiltInConnectionString, ClientBuilder, RpcClient}, - types::{ - EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest, - trace::geth::{ - DiffMode, GethDebugTracingOptions, GethTrace, PreStateConfig, PreStateFrame, - }, - }, - }, - transports::layers::RetryBackoffLayer, }; use anyhow::Context as _; use futures::{Stream, StreamExt}; @@ -67,9 +60,9 @@ use revive_dt_node_interaction::{EthereumNode, MinedBlockInformation}; use crate::{ Node, - common::FallbackGasFiller, - constants::INITIAL_BALANCE, - process::{Process, ProcessReadinessWaitBehavior}, + constants::{CHAIN_ID, INITIAL_BALANCE}, + helpers::{Process, ProcessReadinessWaitBehavior}, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -110,30 +103,8 @@ pub struct LighthouseGethNode { wallet: Arc, nonce_manager: CachedNonceManager, - persistent_http_provider: OnceCell< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - >, - >, - persistent_ws_provider: OnceCell< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - >, - >, + persistent_http_provider: OnceCell>>, + persistent_ws_provider: OnceCell>>, http_provider_requests_semaphore: LazyLock, } @@ -262,7 +233,7 @@ impl LighthouseGethNode { network_parameters: NetworkParameters { preset: NetworkPreset::Mainnet, seconds_per_slot: 12, - network_id: 420420420, + network_id: CHAIN_ID, deposit_contract_address: address!("0x00000000219ab540356cBB839Cbe05303d7705Fa"), altair_fork_epoch: 0, bellatrix_fork_epoch: 0, @@ -395,31 +366,18 @@ impl LighthouseGethNode { err(Debug), )] #[allow(clippy::type_complexity)] - async fn ws_provider( - &self, - ) -> anyhow::Result< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - >, - > { + async fn ws_provider(&self) -> anyhow::Result>> { self.persistent_ws_provider .get_or_try_init(|| async move { - info!("Initializing the WS provider of the lighthouse node"); - let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 1000, 100)) - .connect_with(BuiltInConnectionString::Ws( - self.ws_connection_string.as_str().parse().unwrap(), - None, - )) - .await?; - Ok(self.provider(client)).inspect(|_| info!("Initialized the WS provider")) + construct_concurrency_limited_provider::( + self.ws_connection_string.as_str(), + FallbackGasFiller::default(), + ChainIdFiller::new(Some(CHAIN_ID)), + NonceFiller::new(self.nonce_manager.clone()), + self.wallet.clone(), + ) + .await + .context("Failed to construct the provider") }) .await .cloned() @@ -434,57 +392,23 @@ impl LighthouseGethNode { #[allow(clippy::type_complexity)] async fn http_provider( &self, - ) -> anyhow::Result< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - >, - > { + ) -> anyhow::Result>> { self.persistent_http_provider .get_or_try_init(|| async move { - info!("Initializing the HTTP provider of the lighthouse node"); - let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 1000, 100)) - .connect_with(BuiltInConnectionString::Http( - self.http_connection_string.as_str().parse().unwrap(), - )) - .await?; - Ok(self.provider(client)) + construct_concurrency_limited_provider::( + self.http_connection_string.as_str(), + FallbackGasFiller::default(), + ChainIdFiller::new(Some(CHAIN_ID)), + NonceFiller::new(self.nonce_manager.clone()), + self.wallet.clone(), + ) + .await + .context("Failed to construct the provider") }) .await .cloned() } - #[allow(clippy::type_complexity)] - fn provider( - &self, - rpc_client: RpcClient, - ) -> FillProvider< - JoinFill< - JoinFill, ChainIdFiller>, NonceFiller>, - WalletFiller>, - >, - RootProvider, - > { - ProviderBuilder::new() - .disable_recommended_fillers() - .filler(FallbackGasFiller::new( - 25_000_000, - 1_000_000_000, - 1_000_000_000, - )) - .filler(ChainIdFiller::new(Some(420420420))) - .filler(NonceFiller::new(self.nonce_manager.clone())) - .wallet(self.wallet.clone()) - .connect_client(rpc_client) - } - /// Funds all of the accounts in the Ethereum wallet from the initially funded account. #[instrument( level = "info", @@ -511,7 +435,7 @@ impl LighthouseGethNode { .to(address) .nonce(nonce as _) .value(INITIAL_BALANCE.try_into().unwrap()); - transaction.chain_id = Some(420420420); + transaction.chain_id = Some(CHAIN_ID); self.submit_transaction(transaction).await }), ) diff --git a/crates/node/src/node_implementations/mod.rs b/crates/node/src/node_implementations/mod.rs new file mode 100644 index 0000000..83b8371 --- /dev/null +++ b/crates/node/src/node_implementations/mod.rs @@ -0,0 +1,3 @@ +pub mod geth; +pub mod lighthouse_geth; +pub mod substrate; diff --git a/crates/node/src/substrate.rs b/crates/node/src/node_implementations/substrate.rs similarity index 96% rename from crates/node/src/substrate.rs rename to crates/node/src/node_implementations/substrate.rs index 4e61456..359e2e6 100644 --- a/crates/node/src/substrate.rs +++ b/crates/node/src/node_implementations/substrate.rs @@ -23,12 +23,9 @@ use alloy::{ TxHash, U256, }, providers::{ - Identity, Provider, ProviderBuilder, RootProvider, + Provider, ext::DebugApi, - fillers::{ - CachedNonceManager, ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, - WalletFiller, - }, + fillers::{CachedNonceManager, ChainIdFiller, NonceFiller}, }, rpc::types::{ EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest, @@ -55,9 +52,9 @@ use tracing::instrument; use crate::{ Node, - common::FallbackGasFiller, - constants::INITIAL_BALANCE, - process::{Process, ProcessReadinessWaitBehavior}, + constants::{CHAIN_ID, INITIAL_BALANCE}, + helpers::{Process, ProcessReadinessWaitBehavior}, + provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -79,21 +76,7 @@ pub struct SubstrateNode { eth_proxy_process: Option, wallet: Arc, nonce_manager: CachedNonceManager, - chain_id_filler: ChainIdFiller, - #[allow(clippy::type_complexity)] - provider: OnceCell< - FillProvider< - JoinFill< - JoinFill< - JoinFill, ChainIdFiller>, - NonceFiller, - >, - WalletFiller>, - >, - RootProvider, - ReviveNetwork, - >, - >, + provider: OnceCell>>, } impl SubstrateNode { @@ -143,7 +126,6 @@ impl SubstrateNode { substrate_process: None, eth_proxy_process: None, wallet: wallet.clone(), - chain_id_filler: Default::default(), nonce_manager: Default::default(), provider: Default::default(), } @@ -358,29 +340,18 @@ impl SubstrateNode { async fn provider( &self, - ) -> anyhow::Result< - FillProvider< - impl TxFiller, - impl Provider + Clone, - ReviveNetwork, - >, - > { + ) -> anyhow::Result>> { self.provider .get_or_try_init(|| async move { - ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .filler(FallbackGasFiller::new( - 25_000_000, - 1_000_000_000, - 1_000_000_000, - )) - .filler(self.chain_id_filler.clone()) - .filler(NonceFiller::new(self.nonce_manager.clone())) - .wallet(self.wallet.clone()) - .connect(&self.rpc_url) - .await - .map_err(Into::into) + construct_concurrency_limited_provider::( + self.rpc_url.as_str(), + FallbackGasFiller::default(), + ChainIdFiller::new(Some(CHAIN_ID)), + NonceFiller::new(self.nonce_manager.clone()), + self.wallet.clone(), + ) + .await + .context("Failed to construct the provider") }) .await .cloned() @@ -436,7 +407,12 @@ impl EthereumNode for SubstrateNode { &self, transaction: TransactionRequest, ) -> Pin> + '_>> { + static SEMAPHORE: std::sync::LazyLock = + std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(500)); + Box::pin(async move { + let _permit = SEMAPHORE.acquire().await?; + let receipt = self .provider() .await @@ -572,14 +548,12 @@ impl EthereumNode for SubstrateNode { } } -pub struct SubstrateNodeResolver, P: Provider> { +pub struct SubstrateNodeResolver { id: u32, - provider: FillProvider, + provider: ConcreteProvider>, } -impl, P: Provider> ResolverApi - for SubstrateNodeResolver -{ +impl ResolverApi for SubstrateNodeResolver { #[instrument(level = "info", skip_all, fields(substrate_node_id = self.id))] fn chain_id( &self, diff --git a/crates/node/src/provider_utils/concurrency_limiter.rs b/crates/node/src/provider_utils/concurrency_limiter.rs new file mode 100644 index 0000000..585fd73 --- /dev/null +++ b/crates/node/src/provider_utils/concurrency_limiter.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use alloy::transports::BoxFuture; +use tokio::sync::Semaphore; +use tower::{Layer, Service}; + +#[derive(Clone, Debug)] +pub struct ConcurrencyLimiterLayer { + semaphore: Arc, +} + +impl ConcurrencyLimiterLayer { + pub fn new(permit_count: usize) -> Self { + Self { + semaphore: Arc::new(Semaphore::new(permit_count)), + } + } +} + +impl Layer for ConcurrencyLimiterLayer { + type Service = ConcurrencyLimiterService; + + fn layer(&self, inner: S) -> Self::Service { + ConcurrencyLimiterService { + service: inner, + semaphore: self.semaphore.clone(), + } + } +} + +#[derive(Clone)] +pub struct ConcurrencyLimiterService { + service: S, + semaphore: Arc, +} + +impl Service for ConcurrencyLimiterService +where + S: Service + Send, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let semaphore = self.semaphore.clone(); + let future = self.service.call(req); + + Box::pin(async move { + let _permit = semaphore + .acquire() + .await + .expect("Semaphore has been closed"); + future.await + }) + } +} diff --git a/crates/node/src/common.rs b/crates/node/src/provider_utils/fallback_gas_provider.rs similarity index 93% rename from crates/node/src/common.rs rename to crates/node/src/provider_utils/fallback_gas_provider.rs index f260062..ff74ea2 100644 --- a/crates/node/src/common.rs +++ b/crates/node/src/provider_utils/fallback_gas_provider.rs @@ -30,6 +30,12 @@ impl FallbackGasFiller { } } +impl Default for FallbackGasFiller { + fn default() -> Self { + FallbackGasFiller::new(25_000_000, 1_000_000_000, 1_000_000_000) + } +} + impl TxFiller for FallbackGasFiller where N: Network, diff --git a/crates/node/src/provider_utils/mod.rs b/crates/node/src/provider_utils/mod.rs new file mode 100644 index 0000000..b0738da --- /dev/null +++ b/crates/node/src/provider_utils/mod.rs @@ -0,0 +1,7 @@ +mod concurrency_limiter; +mod fallback_gas_provider; +mod provider; + +pub use concurrency_limiter::*; +pub use fallback_gas_provider::*; +pub use provider::*; diff --git a/crates/node/src/provider_utils/provider.rs b/crates/node/src/provider_utils/provider.rs new file mode 100644 index 0000000..d795bb4 --- /dev/null +++ b/crates/node/src/provider_utils/provider.rs @@ -0,0 +1,63 @@ +use std::sync::LazyLock; + +use alloy::{ + network::{Network, NetworkWallet, TransactionBuilder4844}, + providers::{ + Identity, ProviderBuilder, RootProvider, + fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller}, + }, + rpc::client::ClientBuilder, +}; +use anyhow::{Context, Result}; + +use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; + +pub type ConcreteProvider = FillProvider< + JoinFill< + JoinFill, ChainIdFiller>, NonceFiller>, + WalletFiller, + >, + RootProvider, + N, +>; + +pub async fn construct_concurrency_limited_provider( + rpc_url: &str, + fallback_gas_filler: FallbackGasFiller, + chain_id_filler: ChainIdFiller, + nonce_filler: NonceFiller, + wallet: W, +) -> Result> +where + N: Network, + W: NetworkWallet, + Identity: TxFiller, + FallbackGasFiller: TxFiller, + ChainIdFiller: TxFiller, + NonceFiller: TxFiller, + WalletFiller: TxFiller, +{ + // This is a global limit on the RPC concurrency that applies to all of the providers created + // by the framework. With this limit, it means that we can have a maximum of 1500 concurrent + // requests at any point of time and no more than that. This is done in an effort to stabilize + // the framework from some of the interment issues that we've been seeing related to RPC calls. + static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock = + LazyLock::new(|| ConcurrencyLimiterLayer::new(1500)); + + let client = ClientBuilder::default() + .layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone()) + .connect(rpc_url) + .await + .context("Failed to construct the RPC client")?; + + let provider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .filler(fallback_gas_filler) + .filler(chain_id_filler) + .filler(nonce_filler) + .wallet(wallet) + .connect_client(client); + + Ok(provider) +} diff --git a/run_tests.sh b/run_tests.sh index fe0dfe1..14506cb 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -90,7 +90,7 @@ echo "" # Run the tool RUST_LOG="info,alloy_pubsub::service=error" cargo run --release -- test \ - --platform revive-dev-node-polkavm-resolc \ + --platform revive-dev-node-revm-solc \ --corpus "$CORPUS_FILE" \ --working-directory "$WORKDIR" \ --concurrency.number-of-nodes 5 \