diff --git a/Cargo.lock b/Cargo.lock index 44029af..d0e8908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5658,6 +5658,7 @@ version = "0.1.0" dependencies = [ "alloy", "anyhow", + "async-stream", "futures", "revive-common", "revive-dt-common", diff --git a/Cargo.toml b/Cargo.toml index 980866a..63eab61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ revive-dt-report = { version = "0.1.0", path = "crates/report" } revive-dt-solc-binaries = { version = "0.1.0", path = "crates/solc-binaries" } anyhow = "1.0" +async-stream = { version = "0.3.6" } bson = { version = "2.15.0" } cacache = { version = "13.1.0" } clap = { version = "4", features = ["derive"] } @@ -73,7 +74,7 @@ revive-solc-json-interface = { git = "https://github.com/paritytech/revive", rev revive-common = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } revive-differential = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } -zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev ="891f6554354ce466abd496366dbf8b4f82141241" } +zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev = "891f6554354ce466abd496366dbf8b4f82141241" } [workspace.dependencies.alloy] version = "1.0.37" diff --git a/crates/core/src/differential_benchmarks/driver.rs b/crates/core/src/differential_benchmarks/driver.rs index abdc6fd..b1ee1f8 100644 --- a/crates/core/src/differential_benchmarks/driver.rs +++ b/crates/core/src/differential_benchmarks/driver.rs @@ -35,7 +35,7 @@ use revive_dt_format::{ }, traits::{ResolutionContext, ResolverApi}, }; -use tokio::sync::{Mutex, mpsc::UnboundedSender}; +use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender}; use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; use crate::{ @@ -123,13 +123,7 @@ where &self.platform_information.reporter, ) .await - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Pre-linking compilation failed" - ) - }) + .inspect_err(|err| error!(?err, "Pre-linking compilation failed")) .context("Failed to produce the pre-linking compiled contracts")?; let mut deployed_libraries = None::>; @@ -137,13 +131,7 @@ where .test_definition .metadata .contract_sources() - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Failed to retrieve contract sources from metadata" - ) - }) + .inspect_err(|err| error!(?err, "Failed to retrieve contract sources from metadata")) .context("Failed to get the contract instances from the metadata file")?; for library_instance in self .test_definition @@ -195,16 +183,11 @@ where error!( ?err, %library_instance, - platform_identifier = %self.platform_information.platform.platform_identifier(), "Failed to deploy the library" ) })?; - debug!( - ?library_instance, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Deployed library" - ); + debug!(?library_instance, "Deployed library"); let library_address = receipt .contract_address @@ -227,13 +210,7 @@ where &self.platform_information.reporter, ) .await - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Post-linking compilation failed" - ) - }) + .inspect_err(|err| error!(?err, "Post-linking compilation failed")) .context("Failed to compile the post-link contracts")?; self.execution_state = ExecutionState::new( @@ -269,7 +246,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %step_path, ), err(Debug), @@ -309,11 +285,7 @@ where .handle_function_call_execution(step, deployment_receipts) .await .context("Failed to handle the function call execution")?; - let tracing_result = self - .handle_function_call_call_frame_tracing(execution_receipt.transaction_hash) - .await - .context("Failed to handle the function call call frame tracing")?; - self.handle_function_call_variable_assignment(step, &tracing_result) + self.handle_function_call_variable_assignment(step, execution_receipt.transaction_hash) .await .context("Failed to handle function call variable assignment")?; Ok(1) @@ -417,15 +389,19 @@ where async fn handle_function_call_variable_assignment( &mut self, step: &FunctionCallStep, - tracing_result: &CallFrame, + tx_hash: TxHash, ) -> Result<()> { let Some(ref assignments) = step.variable_assignments else { return Ok(()); }; // Handling the return data variable assignments. + let callframe = OnceCell::new(); for (variable_name, output_word) in assignments.return_data.iter().zip( - tracing_result + callframe + .get_or_try_init(|| self.handle_function_call_call_frame_tracing(tx_hash)) + .await + .context("Failed to get the callframe trace for transaction")? .output .as_ref() .unwrap_or_default() @@ -547,7 +523,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), @@ -590,7 +565,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), @@ -764,7 +738,9 @@ where .instrument(info_span!("Polling for receipt")) }, ) + .instrument(info_span!("Polling for receipt", %transaction_hash)) .await + .inspect(|_| info!("Found the transaction receipt")) } // endregion:Transaction Execution } diff --git a/crates/core/src/differential_benchmarks/entry_point.rs b/crates/core/src/differential_benchmarks/entry_point.rs index 7702d52..fd80aa3 100644 --- a/crates/core/src/differential_benchmarks/entry_point.rs +++ b/crates/core/src/differential_benchmarks/entry_point.rs @@ -8,7 +8,7 @@ use revive_dt_common::types::PrivateKeyAllocator; use revive_dt_core::Platform; use revive_dt_format::steps::{Step, StepIdx, StepPath}; use tokio::sync::Mutex; -use tracing::{error, info, info_span, instrument, warn}; +use tracing::{Instrument, error, info, info_span, instrument, warn}; use revive_dt_config::{BenchmarkingContext, Context}; use revive_dt_report::Reporter; @@ -159,12 +159,15 @@ pub async fn handle_differential_benchmarks( futures::future::try_join( watcher.run(), - driver.execute_all().inspect(|_| { - info!("All transactions submitted - driver completed execution"); - watcher_tx - .send(WatcherEvent::AllTransactionsSubmitted) - .unwrap() - }), + driver + .execute_all() + .instrument(info_span!("Executing Benchmarks", %platform_identifier)) + .inspect(|_| { + info!("All transactions submitted - driver completed execution"); + watcher_tx + .send(WatcherEvent::AllTransactionsSubmitted) + .unwrap() + }), ) .await .context("Failed to run the driver and executor") diff --git a/crates/core/src/differential_benchmarks/watcher.rs b/crates/core/src/differential_benchmarks/watcher.rs index 12ea840..66634c8 100644 --- a/crates/core/src/differential_benchmarks/watcher.rs +++ b/crates/core/src/differential_benchmarks/watcher.rs @@ -118,8 +118,9 @@ impl Watcher { } info!( - remaining_transactions = watch_for_transaction_hashes.read().await.len(), + block_number = block.block_number, block_tx_count = block.transaction_hashes.len(), + remaining_transactions = watch_for_transaction_hashes.read().await.len(), "Observed a block" ); diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9030b86..dfe0827 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } alloy = { workspace = true } +async-stream = { workspace = true } futures = { workspace = true } tracing = { workspace = true } tower = { workspace = true } diff --git a/crates/node/src/node_implementations/substrate.rs b/crates/node/src/node_implementations/substrate.rs index a92b3da..8a4d99e 100644 --- a/crates/node/src/node_implementations/substrate.rs +++ b/crates/node/src/node_implementations/substrate.rs @@ -36,7 +36,8 @@ use alloy::{ }, }; use anyhow::Context as _; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::Stream; use revive_common::EVMVersion; use revive_dt_common::fs::clear_directory; use revive_dt_format::traits::ResolverApi; @@ -245,6 +246,12 @@ impl SubstrateNode { .arg("all") .arg("--rpc-max-connections") .arg(u32::MAX.to_string()) + .arg("--consensus") + .arg("manual-seal-12000") + .arg("--pool-limit") + .arg(u32::MAX.to_string()) + .arg("--pool-kbytes") + .arg(u32::MAX.to_string()) .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) .stdout(stdout_file) .stderr(stderr_file); @@ -508,37 +515,46 @@ impl EthereumNode for SubstrateNode { + '_, >, > { + fn create_stream( + provider: ConcreteProvider>, + ) -> impl Stream { + stream! { + let mut block_number = provider.get_block_number().await.expect("Failed to get the block number"); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await + else { + continue; + }; + + block_number += 1; + yield MinedBlockInformation { + block_number: block.number(), + block_timestamp: block.header.timestamp, + mined_gas: block.header.gas_used as _, + block_gas_limit: block.header.gas_limit, + transaction_hashes: block + .transactions + .into_hashes() + .as_hashes() + .expect("Must be hashes") + .to_vec(), + }; + }; + } + } + Box::pin(async move { let provider = self .provider() .await - .context("Failed to create the provider for block subscription")?; - let mut block_subscription = provider - .watch_full_blocks() - .await - .context("Failed to create the blocks stream")?; - block_subscription.set_channel_size(0xFFFF); - block_subscription.set_poll_interval(Duration::from_secs(1)); - let block_stream = block_subscription.into_stream(); + .context("Failed to create the provider for a block subscription")?; - let mined_block_information_stream = block_stream.filter_map(|block| async { - let block = block.ok()?; - Some(MinedBlockInformation { - block_number: block.number(), - block_timestamp: block.header.timestamp, - mined_gas: block.header.gas_used as _, - block_gas_limit: block.header.gas_limit, - transaction_hashes: block - .transactions - .into_hashes() - .as_hashes() - .expect("Must be hashes") - .to_vec(), - }) - }); + let stream = Box::pin(create_stream(provider)) + as Pin>>; - Ok(Box::pin(mined_block_information_stream) - as Pin>>) + Ok(stream) }) } } diff --git a/crates/node/src/node_implementations/zombienet.rs b/crates/node/src/node_implementations/zombienet.rs index 50cae7e..498e1ff 100644 --- a/crates/node/src/node_implementations/zombienet.rs +++ b/crates/node/src/node_implementations/zombienet.rs @@ -55,7 +55,8 @@ use alloy::{ }; use anyhow::Context as _; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::Stream; use revive_common::EVMVersion; use revive_dt_common::fs::clear_directory; use revive_dt_config::*; @@ -122,6 +123,8 @@ impl ZombienetNode { const PARACHAIN_ID: u32 = 100; const ETH_RPC_BASE_PORT: u16 = 8545; + const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; + const ETH_RPC_READY_MARKER: &str = "Running JSON-RPC server"; const EXPORT_CHAINSPEC_COMMAND: &str = "build-spec"; @@ -188,12 +191,13 @@ impl ZombienetNode { .with_node(|node| node.with_name("bob")) }) .with_global_settings(|global_settings| { - global_settings.with_base_dir(&self.base_directory) + // global_settings.with_base_dir(&self.base_directory) + global_settings }) .with_parachain(|parachain| { parachain .with_id(Self::PARACHAIN_ID) - .with_chain_spec_path(template_chainspec_path.to_str().unwrap()) + .with_chain_spec_path(template_chainspec_path.to_path_buf()) .with_chain("asset-hub-westend-local") .with_collator(|node_config| { node_config @@ -247,6 +251,7 @@ impl ZombienetNode { .arg(u32::MAX.to_string()) .arg("--rpc-port") .arg(eth_rpc_port.to_string()) + .env("RUST_LOG", Self::PROXY_LOG_ENV) .stdout(stdout_file) .stderr(stderr_file); }, @@ -557,37 +562,46 @@ impl EthereumNode for ZombienetNode { + '_, >, > { + fn create_stream( + provider: ConcreteProvider>, + ) -> impl Stream { + stream! { + let mut block_number = provider.get_block_number().await.expect("Failed to get the block number"); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await + else { + continue; + }; + + block_number += 1; + yield MinedBlockInformation { + block_number: block.number(), + block_timestamp: block.header.timestamp, + mined_gas: block.header.gas_used as _, + block_gas_limit: block.header.gas_limit, + transaction_hashes: block + .transactions + .into_hashes() + .as_hashes() + .expect("Must be hashes") + .to_vec(), + }; + }; + } + } + Box::pin(async move { let provider = self .provider() .await - .context("Failed to create the provider for block subscription")?; - let mut block_subscription = provider - .watch_full_blocks() - .await - .context("Failed to create the blocks stream")?; - block_subscription.set_channel_size(0xFFFF); - block_subscription.set_poll_interval(Duration::from_secs(1)); - let block_stream = block_subscription.into_stream(); + .context("Failed to create the provider for a block subscription")?; - let mined_block_information_stream = block_stream.filter_map(|block| async { - let block = block.ok()?; - Some(MinedBlockInformation { - block_number: block.number(), - block_timestamp: block.header.timestamp, - mined_gas: block.header.gas_used as _, - block_gas_limit: block.header.gas_limit, - transaction_hashes: block - .transactions - .into_hashes() - .as_hashes() - .expect("Must be hashes") - .to_vec(), - }) - }); + let stream = Box::pin(create_stream(provider)) + as Pin>>; - Ok(Box::pin(mined_block_information_stream) - as Pin>>) + Ok(stream) }) } } diff --git a/crates/node/src/provider_utils/fallback_gas_provider.rs b/crates/node/src/provider_utils/fallback_gas_filler.rs similarity index 74% rename from crates/node/src/provider_utils/fallback_gas_provider.rs rename to crates/node/src/provider_utils/fallback_gas_filler.rs index ff74ea2..c218316 100644 --- a/crates/node/src/provider_utils/fallback_gas_provider.rs +++ b/crates/node/src/provider_utils/fallback_gas_filler.rs @@ -7,6 +7,10 @@ use alloy::{ transports::TransportResult, }; +// Percentage padding applied to estimated gas (e.g. 120 = 20% padding) +const GAS_ESTIMATE_PADDING_NUMERATOR: u64 = 120; +const GAS_ESTIMATE_PADDING_DENOMINATOR: u64 = 100; + #[derive(Clone, Debug)] pub struct FallbackGasFiller { inner: GasFiller, @@ -56,8 +60,6 @@ where provider: &P, tx: &::TransactionRequest, ) -> TransportResult { - // Try to fetch GasFiller’s “fillable” (gas_price, base_fee, estimate_gas, …) - // If it errors (i.e. tx would revert under eth_estimateGas), swallow it. match self.inner.prepare(provider, tx).await { Ok(fill) => Ok(Some(fill)), Err(_) => Ok(None), @@ -70,8 +72,17 @@ where mut tx: alloy::providers::SendableTx, ) -> TransportResult> { if let Some(fill) = fillable { - // our inner GasFiller succeeded — use it - self.inner.fill(fill, tx).await + let mut tx = self.inner.fill(fill, tx).await?; + if let Some(builder) = tx.as_mut_builder() { + if let Some(estimated) = builder.gas_limit() { + let padded = estimated + .checked_mul(GAS_ESTIMATE_PADDING_NUMERATOR) + .and_then(|v| v.checked_div(GAS_ESTIMATE_PADDING_DENOMINATOR)) + .unwrap_or(u64::MAX); + builder.set_gas_limit(padded); + } + } + Ok(tx) } else { if let Some(builder) = tx.as_mut_builder() { builder.set_gas_limit(self.default_gas_limit); diff --git a/crates/node/src/provider_utils/mod.rs b/crates/node/src/provider_utils/mod.rs index b0738da..c556a71 100644 --- a/crates/node/src/provider_utils/mod.rs +++ b/crates/node/src/provider_utils/mod.rs @@ -1,7 +1,7 @@ mod concurrency_limiter; -mod fallback_gas_provider; +mod fallback_gas_filler; mod provider; pub use concurrency_limiter::*; -pub use fallback_gas_provider::*; +pub use fallback_gas_filler::*; pub use provider::*; diff --git a/crates/node/src/provider_utils/provider.rs b/crates/node/src/provider_utils/provider.rs index 6862537..f10b3b6 100644 --- a/crates/node/src/provider_utils/provider.rs +++ b/crates/node/src/provider_utils/provider.rs @@ -10,7 +10,7 @@ use alloy::{ }; use anyhow::{Context, Result}; use revive_dt_common::futures::{PollingWaitBehavior, poll}; -use tracing::debug; +use tracing::{Instrument, debug, info, info_span}; use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; @@ -44,7 +44,7 @@ where // requests at any point of time and no more than that. This is done in an effort to stabilize // the framework from some of the interment issues that we've been seeing related to RPC calls. static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock = - LazyLock::new(|| ConcurrencyLimiterLayer::new(10)); + LazyLock::new(|| ConcurrencyLimiterLayer::new(500)); let client = ClientBuilder::default() .layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone()) @@ -117,12 +117,16 @@ where async move { match provider.get_transaction_receipt(tx_hash).await { - Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), + Ok(Some(receipt)) => { + info!("Found the transaction receipt"); + Ok(ControlFlow::Break(receipt)) + } _ => Ok(ControlFlow::Continue(())), } } }, ) + .instrument(info_span!("Polling for receipt", %tx_hash)) .await .context(format!("Polling for receipt failed for {tx_hash}")) }