diff --git a/Cargo.lock b/Cargo.lock index 44029af..d0e8908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5658,6 +5658,7 @@ version = "0.1.0" dependencies = [ "alloy", "anyhow", + "async-stream", "futures", "revive-common", "revive-dt-common", diff --git a/Cargo.toml b/Cargo.toml index 980866a..63eab61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ revive-dt-report = { version = "0.1.0", path = "crates/report" } revive-dt-solc-binaries = { version = "0.1.0", path = "crates/solc-binaries" } anyhow = "1.0" +async-stream = { version = "0.3.6" } bson = { version = "2.15.0" } cacache = { version = "13.1.0" } clap = { version = "4", features = ["derive"] } @@ -73,7 +74,7 @@ revive-solc-json-interface = { git = "https://github.com/paritytech/revive", rev revive-common = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } revive-differential = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } -zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev ="891f6554354ce466abd496366dbf8b4f82141241" } +zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev = "891f6554354ce466abd496366dbf8b4f82141241" } [workspace.dependencies.alloy] version = "1.0.37" diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 7e8814b..46b978c 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -680,6 +680,14 @@ pub struct ReviveDevNodeConfiguration { value_parser = parse_duration )] pub start_timeout_ms: Duration, + + /// The consensus to use for the spawned revive-dev-node. + #[clap( + id = "revive-dev-node.consensus", + long = "revive-dev-node.consensus", + default_value = "instant-seal" + )] + pub consensus: String, } /// A set of configuration parameters for the ETH RPC. diff --git a/crates/core/src/differential_benchmarks/driver.rs b/crates/core/src/differential_benchmarks/driver.rs index abdc6fd..080e043 100644 --- a/crates/core/src/differential_benchmarks/driver.rs +++ b/crates/core/src/differential_benchmarks/driver.rs @@ -22,6 +22,7 @@ use alloy::{ }, }; use anyhow::{Context as _, Result, bail}; +use futures::TryFutureExt; use indexmap::IndexMap; use revive_dt_common::{ futures::{PollingWaitBehavior, poll}, @@ -35,7 +36,7 @@ use revive_dt_format::{ }, traits::{ResolutionContext, ResolverApi}, }; -use tokio::sync::{Mutex, mpsc::UnboundedSender}; +use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender}; use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; use crate::{ @@ -123,13 +124,7 @@ where &self.platform_information.reporter, ) .await - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Pre-linking compilation failed" - ) - }) + .inspect_err(|err| error!(?err, "Pre-linking compilation failed")) .context("Failed to produce the pre-linking compiled contracts")?; let mut deployed_libraries = None::>; @@ -137,13 +132,7 @@ where .test_definition .metadata .contract_sources() - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Failed to retrieve contract sources from metadata" - ) - }) + .inspect_err(|err| error!(?err, "Failed to retrieve contract sources from metadata")) .context("Failed to get the contract instances from the metadata file")?; for library_instance in self .test_definition @@ -191,20 +180,19 @@ where TransactionRequest::default().from(deployer_address), code, ); - let receipt = self.execute_transaction(tx).await.inspect_err(|err| { - error!( - ?err, - %library_instance, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Failed to deploy the library" - ) - })?; + let receipt = self + .execute_transaction(tx) + .and_then(|(_, receipt_fut)| receipt_fut) + .await + .inspect_err(|err| { + error!( + ?err, + %library_instance, + "Failed to deploy the library" + ) + })?; - debug!( - ?library_instance, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Deployed library" - ); + debug!(?library_instance, "Deployed library"); let library_address = receipt .contract_address @@ -227,13 +215,7 @@ where &self.platform_information.reporter, ) .await - .inspect_err(|err| { - error!( - ?err, - platform_identifier = %self.platform_information.platform.platform_identifier(), - "Post-linking compilation failed" - ) - }) + .inspect_err(|err| error!(?err, "Post-linking compilation failed")) .context("Failed to compile the post-link contracts")?; self.execution_state = ExecutionState::new( @@ -269,7 +251,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %step_path, ), err(Debug), @@ -305,15 +286,11 @@ where .handle_function_call_contract_deployment(step) .await .context("Failed to deploy contracts for the function call step")?; - let execution_receipt = self + let transaction_hash = self .handle_function_call_execution(step, deployment_receipts) .await .context("Failed to handle the function call execution")?; - let tracing_result = self - .handle_function_call_call_frame_tracing(execution_receipt.transaction_hash) - .await - .context("Failed to handle the function call call frame tracing")?; - self.handle_function_call_variable_assignment(step, &tracing_result) + self.handle_function_call_variable_assignment(step, transaction_hash) .await .context("Failed to handle function call variable assignment")?; Ok(1) @@ -367,18 +344,19 @@ where &mut self, step: &FunctionCallStep, mut deployment_receipts: HashMap, - ) -> Result { + ) -> Result { match step.method { // This step was already executed when `handle_step` was called. We just need to // lookup the transaction receipt in this case and continue on. Method::Deployer => deployment_receipts .remove(&step.instance) - .context("Failed to find deployment receipt for constructor call"), + .context("Failed to find deployment receipt for constructor call") + .map(|receipt| receipt.transaction_hash), Method::Fallback | Method::FunctionName(_) => { let tx = step .as_transaction(self.resolver.as_ref(), self.default_resolution_context()) .await?; - self.execute_transaction(tx).await + Ok(self.execute_transaction(tx).await?.0) } } } @@ -417,15 +395,19 @@ where async fn handle_function_call_variable_assignment( &mut self, step: &FunctionCallStep, - tracing_result: &CallFrame, + tx_hash: TxHash, ) -> Result<()> { let Some(ref assignments) = step.variable_assignments else { return Ok(()); }; // Handling the return data variable assignments. + let callframe = OnceCell::new(); for (variable_name, output_word) in assignments.return_data.iter().zip( - tracing_result + callframe + .get_or_try_init(|| self.handle_function_call_call_frame_tracing(tx_hash)) + .await + .context("Failed to get the callframe trace for transaction")? .output .as_ref() .unwrap_or_default() @@ -547,7 +529,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), @@ -590,7 +571,6 @@ where skip_all, fields( driver_id = self.driver_id, - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), @@ -660,7 +640,11 @@ where TransactionBuilder::::with_deploy_code(tx, code) }; - let receipt = match self.execute_transaction(tx).await { + let receipt = match self + .execute_transaction(tx) + .and_then(|(_, receipt_fut)| receipt_fut) + .await + { Ok(receipt) => receipt, Err(error) => { tracing::error!(?error, "Contract deployment transaction failed."); @@ -734,7 +718,7 @@ where async fn execute_transaction( &self, transaction: TransactionRequest, - ) -> anyhow::Result { + ) -> anyhow::Result<(TxHash, impl Future>)> { let node = self.platform_information.node; let transaction_hash = node .submit_transaction(transaction) @@ -747,24 +731,28 @@ where .send(WatcherEvent::SubmittedTransaction { transaction_hash }) .context("Failed to send the transaction hash to the watcher")?; - info!("Starting to poll for transaction receipt"); - poll( - Duration::from_secs(30 * 60), - PollingWaitBehavior::Constant(Duration::from_secs(1)), - || { - async move { - match node.get_receipt(transaction_hash).await { - Ok(receipt) => { - info!("Polling succeeded, receipt found"); - Ok(ControlFlow::Break(receipt)) + Ok((transaction_hash, async move { + info!("Starting to poll for transaction receipt"); + poll( + Duration::from_secs(30 * 60), + PollingWaitBehavior::Constant(Duration::from_secs(1)), + || { + async move { + match node.get_receipt(transaction_hash).await { + Ok(receipt) => { + info!("Polling succeeded, receipt found"); + Ok(ControlFlow::Break(receipt)) + } + Err(_) => Ok(ControlFlow::Continue(())), } - Err(_) => Ok(ControlFlow::Continue(())), } - } - .instrument(info_span!("Polling for receipt")) - }, - ) - .await + .instrument(info_span!("Polling for receipt")) + }, + ) + .instrument(info_span!("Polling for receipt", %transaction_hash)) + .await + .inspect(|_| info!("Found the transaction receipt")) + })) } // endregion:Transaction Execution } diff --git a/crates/core/src/differential_benchmarks/entry_point.rs b/crates/core/src/differential_benchmarks/entry_point.rs index 7702d52..fd80aa3 100644 --- a/crates/core/src/differential_benchmarks/entry_point.rs +++ b/crates/core/src/differential_benchmarks/entry_point.rs @@ -8,7 +8,7 @@ use revive_dt_common::types::PrivateKeyAllocator; use revive_dt_core::Platform; use revive_dt_format::steps::{Step, StepIdx, StepPath}; use tokio::sync::Mutex; -use tracing::{error, info, info_span, instrument, warn}; +use tracing::{Instrument, error, info, info_span, instrument, warn}; use revive_dt_config::{BenchmarkingContext, Context}; use revive_dt_report::Reporter; @@ -159,12 +159,15 @@ pub async fn handle_differential_benchmarks( futures::future::try_join( watcher.run(), - driver.execute_all().inspect(|_| { - info!("All transactions submitted - driver completed execution"); - watcher_tx - .send(WatcherEvent::AllTransactionsSubmitted) - .unwrap() - }), + driver + .execute_all() + .instrument(info_span!("Executing Benchmarks", %platform_identifier)) + .inspect(|_| { + info!("All transactions submitted - driver completed execution"); + watcher_tx + .send(WatcherEvent::AllTransactionsSubmitted) + .unwrap() + }), ) .await .context("Failed to run the driver and executor") diff --git a/crates/core/src/differential_benchmarks/watcher.rs b/crates/core/src/differential_benchmarks/watcher.rs index 12ea840..96f0098 100644 --- a/crates/core/src/differential_benchmarks/watcher.rs +++ b/crates/core/src/differential_benchmarks/watcher.rs @@ -104,6 +104,10 @@ impl Watcher { async move { let mut mined_blocks_information = Vec::new(); + // region:TEMPORARY + eprintln!("Watcher information for {}", self.platform_identifier); + eprintln!("block_number,block_timestamp,mined_gas,block_gas_limit,tx_count"); + // endregion:TEMPORARY while let Some(block) = blocks_information_stream.next().await { // If the block number is equal to or less than the last block before the // repetition then we ignore it and continue on to the next block. @@ -118,8 +122,9 @@ impl Watcher { } info!( - remaining_transactions = watch_for_transaction_hashes.read().await.len(), + block_number = block.block_number, block_tx_count = block.transaction_hashes.len(), + remaining_transactions = watch_for_transaction_hashes.read().await.len(), "Observed a block" ); @@ -131,6 +136,20 @@ impl Watcher { watch_for_transaction_hashes.remove(tx_hash); } + // region:TEMPORARY + // TODO: The following core is TEMPORARY and will be removed once we have proper + // reporting in place and then it can be removed. This serves as as way of doing + // some very simple reporting for the time being. + eprintln!( + "\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"", + block.block_number, + block.block_timestamp, + block.mined_gas, + block.block_gas_limit, + block.transaction_hashes.len() + ); + // endregion:TEMPORARY + mined_blocks_information.push(block); } @@ -139,41 +158,10 @@ impl Watcher { } }; - let (_, mined_blocks_information) = + let (_, _) = futures::future::join(watcher_event_watching_task, block_information_watching_task) .await; - // region:TEMPORARY - { - // TODO: The following core is TEMPORARY and will be removed once we have proper - // reporting in place and then it can be removed. This serves as as way of doing some - // very simple reporting for the time being. - use std::io::Write; - - let mut stderr = std::io::stderr().lock(); - writeln!( - stderr, - "Watcher information for {}", - self.platform_identifier - )?; - writeln!( - stderr, - "block_number,block_timestamp,mined_gas,block_gas_limit,tx_count" - )?; - for block in mined_blocks_information { - writeln!( - stderr, - "{},{},{},{},{}", - block.block_number, - block.block_timestamp, - block.mined_gas, - block.block_gas_limit, - block.transaction_hashes.len() - )? - } - } - // endregion:TEMPORARY - Ok(()) } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 41ab789..005d2ef 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -16,7 +16,7 @@ use revive_dt_config::*; use revive_dt_node::{ Node, node_implementations::geth::GethNode, node_implementations::lighthouse_geth::LighthouseGethNode, - node_implementations::substrate::SubstrateNode, node_implementations::zombienet::ZombieNode, + node_implementations::substrate::SubstrateNode, node_implementations::zombienet::ZombienetNode, }; use revive_dt_node_interaction::EthereumNode; use tracing::info; @@ -184,6 +184,7 @@ impl Platform for KitchensinkPolkavmResolcPlatform { let node = SubstrateNode::new( kitchensink_path, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, + None, context, ); let node = spawn_node(node, genesis)?; @@ -236,6 +237,7 @@ impl Platform for KitchensinkRevmSolcPlatform { let node = SubstrateNode::new( kitchensink_path, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, + None, context, ); let node = spawn_node(node, genesis)?; @@ -280,14 +282,17 @@ impl Platform for ReviveDevNodePolkavmResolcPlatform { context: Context, ) -> anyhow::Result>>> { let genesis_configuration = AsRef::::as_ref(&context); - let revive_dev_node_path = AsRef::::as_ref(&context) - .path - .clone(); + let revive_dev_node_configuration = AsRef::::as_ref(&context); + + let revive_dev_node_path = revive_dev_node_configuration.path.clone(); + let revive_dev_node_consensus = revive_dev_node_configuration.consensus.clone(); + let genesis = genesis_configuration.genesis()?.clone(); Ok(thread::spawn(move || { let node = SubstrateNode::new( revive_dev_node_path, SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND, + Some(revive_dev_node_consensus), context, ); let node = spawn_node(node, genesis)?; @@ -332,14 +337,17 @@ impl Platform for ReviveDevNodeRevmSolcPlatform { context: Context, ) -> anyhow::Result>>> { let genesis_configuration = AsRef::::as_ref(&context); - let revive_dev_node_path = AsRef::::as_ref(&context) - .path - .clone(); + let revive_dev_node_configuration = AsRef::::as_ref(&context); + + let revive_dev_node_path = revive_dev_node_configuration.path.clone(); + let revive_dev_node_consensus = revive_dev_node_configuration.consensus.clone(); + let genesis = genesis_configuration.genesis()?.clone(); Ok(thread::spawn(move || { let node = SubstrateNode::new( revive_dev_node_path, SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND, + Some(revive_dev_node_consensus), context, ); let node = spawn_node(node, genesis)?; @@ -389,7 +397,7 @@ impl Platform for ZombienetPolkavmResolcPlatform { .clone(); let genesis = genesis_configuration.genesis()?.clone(); Ok(thread::spawn(move || { - let node = ZombieNode::new(polkadot_parachain_path, context); + let node = ZombienetNode::new(polkadot_parachain_path, context); let node = spawn_node(node, genesis)?; Ok(Box::new(node) as Box<_>) })) @@ -401,7 +409,7 @@ impl Platform for ZombienetPolkavmResolcPlatform { version: Option, ) -> Pin>>>> { Box::pin(async move { - let compiler = Solc::new(context, version).await; + let compiler = Resolc::new(context, version).await; compiler.map(|compiler| Box::new(compiler) as Box) }) } @@ -437,7 +445,7 @@ impl Platform for ZombienetRevmSolcPlatform { .clone(); let genesis = genesis_configuration.genesis()?.clone(); Ok(thread::spawn(move || { - let node = ZombieNode::new(polkadot_parachain_path, context); + let node = ZombienetNode::new(polkadot_parachain_path, context); let node = spawn_node(node, genesis)?; Ok(Box::new(node) as Box<_>) })) diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9030b86..dfe0827 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } alloy = { workspace = true } +async-stream = { workspace = true } futures = { workspace = true } tracing = { workspace = true } tower = { workspace = true } diff --git a/crates/node/src/node_implementations/lighthouse_geth.rs b/crates/node/src/node_implementations/lighthouse_geth.rs index 2c82ba4..f119107 100644 --- a/crates/node/src/node_implementations/lighthouse_geth.rs +++ b/crates/node/src/node_implementations/lighthouse_geth.rs @@ -1131,6 +1131,7 @@ mod tests { } #[tokio::test] + #[ignore = "Ignored since they take a long time to run"] async fn node_mines_simple_transfer_transaction_and_returns_receipt() { // Arrange let (context, node) = new_node(); diff --git a/crates/node/src/node_implementations/substrate.rs b/crates/node/src/node_implementations/substrate.rs index a92b3da..ccf1e75 100644 --- a/crates/node/src/node_implementations/substrate.rs +++ b/crates/node/src/node_implementations/substrate.rs @@ -36,7 +36,8 @@ use alloy::{ }, }; use anyhow::Context as _; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::Stream; use revive_common::EVMVersion; use revive_dt_common::fs::clear_directory; use revive_dt_format::traits::ResolverApi; @@ -80,6 +81,7 @@ pub struct SubstrateNode { wallet: Arc, nonce_manager: CachedNonceManager, provider: OnceCell>>, + consensus: Option, } impl SubstrateNode { @@ -102,6 +104,7 @@ impl SubstrateNode { pub fn new( node_path: PathBuf, export_chainspec_command: &str, + consensus: Option, context: impl AsRef + AsRef + AsRef, @@ -131,6 +134,7 @@ impl SubstrateNode { wallet: wallet.clone(), nonce_manager: Default::default(), provider: Default::default(), + consensus, } } @@ -228,7 +232,7 @@ impl SubstrateNode { self.logs_directory.as_path(), self.node_binary.as_path(), |command, stdout_file, stderr_file| { - command + let cmd = command .arg("--dev") .arg("--chain") .arg(chainspec_path) @@ -245,9 +249,16 @@ impl SubstrateNode { .arg("all") .arg("--rpc-max-connections") .arg(u32::MAX.to_string()) + .arg("--pool-limit") + .arg(u32::MAX.to_string()) + .arg("--pool-kbytes") + .arg(u32::MAX.to_string()) .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) .stdout(stdout_file) .stderr(stderr_file); + if let Some(consensus) = self.consensus.as_ref() { + cmd.arg("--consensus").arg(consensus.clone()); + } }, ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { max_wait_duration: Duration::from_secs(30), @@ -508,37 +519,46 @@ impl EthereumNode for SubstrateNode { + '_, >, > { + fn create_stream( + provider: ConcreteProvider>, + ) -> impl Stream { + stream! { + let mut block_number = provider.get_block_number().await.expect("Failed to get the block number"); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await + else { + continue; + }; + + block_number += 1; + yield MinedBlockInformation { + block_number: block.number(), + block_timestamp: block.header.timestamp, + mined_gas: block.header.gas_used as _, + block_gas_limit: block.header.gas_limit, + transaction_hashes: block + .transactions + .into_hashes() + .as_hashes() + .expect("Must be hashes") + .to_vec(), + }; + }; + } + } + Box::pin(async move { let provider = self .provider() .await - .context("Failed to create the provider for block subscription")?; - let mut block_subscription = provider - .watch_full_blocks() - .await - .context("Failed to create the blocks stream")?; - block_subscription.set_channel_size(0xFFFF); - block_subscription.set_poll_interval(Duration::from_secs(1)); - let block_stream = block_subscription.into_stream(); + .context("Failed to create the provider for a block subscription")?; - let mined_block_information_stream = block_stream.filter_map(|block| async { - let block = block.ok()?; - Some(MinedBlockInformation { - block_number: block.number(), - block_timestamp: block.header.timestamp, - mined_gas: block.header.gas_used as _, - block_gas_limit: block.header.gas_limit, - transaction_hashes: block - .transactions - .into_hashes() - .as_hashes() - .expect("Must be hashes") - .to_vec(), - }) - }); + let stream = Box::pin(create_stream(provider)) + as Pin>>; - Ok(Box::pin(mined_block_information_stream) - as Pin>>) + Ok(stream) }) } } @@ -1170,6 +1190,7 @@ mod tests { let mut node = SubstrateNode::new( context.kitchensink_configuration.path.clone(), SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, + None, &context, ); node.init(context.genesis_configuration.genesis().unwrap().clone()) @@ -1235,6 +1256,7 @@ mod tests { let mut dummy_node = SubstrateNode::new( context.kitchensink_configuration.path.clone(), SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, + None, &context, ); @@ -1287,6 +1309,7 @@ mod tests { let node = SubstrateNode::new( context.kitchensink_configuration.path.clone(), SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, + None, &context, ); diff --git a/crates/node/src/node_implementations/zombienet.rs b/crates/node/src/node_implementations/zombienet.rs index ca7e3a1..a5ac8d1 100644 --- a/crates/node/src/node_implementations/zombienet.rs +++ b/crates/node/src/node_implementations/zombienet.rs @@ -55,7 +55,8 @@ use alloy::{ }; use anyhow::Context as _; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::Stream; use revive_common::EVMVersion; use revive_dt_common::fs::clear_directory; use revive_dt_config::*; @@ -73,16 +74,19 @@ use crate::{ constants::INITIAL_BALANCE, helpers::{Process, ProcessReadinessWaitBehavior}, node_implementations::substrate::ReviveNetwork, - provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, + provider_utils::{ + ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider, + execute_transaction, + }, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); -/// A Zombienet network where collator is `polkadot-parachain` node with `eth-rpc` -/// [`ZombieNode`] abstracts away the details of managing the zombienet network and provides -/// an interface to interact with the parachain's Ethereum RPC. +/// A Zombienet network where collator is `polkadot-parachain` node with `eth-rpc` [`ZombieNode`] +/// abstracts away the details of managing the zombienet network and provides an interface to +/// interact with the parachain's Ethereum RPC. #[derive(Debug, Default)] -pub struct ZombieNode { +pub struct ZombienetNode { /* Node Identifier */ id: u32, connection_string: String, @@ -110,7 +114,7 @@ pub struct ZombieNode { provider: OnceCell>>, } -impl ZombieNode { +impl ZombienetNode { const BASE_DIRECTORY: &str = "zombienet"; const DATA_DIRECTORY: &str = "data"; const LOGS_DIRECTORY: &str = "logs"; @@ -119,6 +123,8 @@ impl ZombieNode { const PARACHAIN_ID: u32 = 100; const ETH_RPC_BASE_PORT: u16 = 8545; + const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; + const ETH_RPC_READY_MARKER: &str = "Running JSON-RPC server"; const EXPORT_CHAINSPEC_COMMAND: &str = "build-spec"; @@ -178,25 +184,35 @@ impl ZombieNode { let node_rpc_port = Self::NODE_BASE_RPC_PORT + self.id as u16; let network_config = NetworkConfigBuilder::new() - .with_relaychain(|r| { - r.with_chain("westend-local") + .with_relaychain(|relay_chain| { + relay_chain + .with_chain("westend-local") .with_default_command("polkadot") .with_node(|node| node.with_name("alice")) .with_node(|node| node.with_name("bob")) }) - .with_global_settings(|g| g.with_base_dir(&self.base_directory)) - .with_parachain(|p| { - p.with_id(Self::PARACHAIN_ID) - .with_chain_spec_path(template_chainspec_path.to_str().unwrap()) + .with_global_settings(|global_settings| { + // global_settings.with_base_dir(&self.base_directory) + global_settings + }) + .with_parachain(|parachain| { + parachain + .with_id(Self::PARACHAIN_ID) + .with_chain_spec_path(template_chainspec_path.to_path_buf()) .with_chain("asset-hub-westend-local") - .with_collator(|n| { - n.with_name("Collator") + .with_collator(|node_config| { + node_config + .with_name("Collator") .with_command(polkadot_parachain_path) .with_rpc_port(node_rpc_port) + .with_args(vec![ + ("--pool-limit", u32::MAX.to_string().as_str()).into(), + ("--pool-kbytes", u32::MAX.to_string().as_str()).into(), + ]) }) }) .build() - .map_err(|e| anyhow::anyhow!("Failed to build zombienet network config: {e:?}"))?; + .map_err(|err| anyhow::anyhow!("Failed to build zombienet network config: {err:?}"))?; self.node_rpc_port = Some(node_rpc_port); self.network_config = Some(network_config); @@ -210,6 +226,9 @@ impl ZombieNode { .clone() .context("Node not initialized, call init() first")?; + // TODO: Look into the possibility of removing this in the future, perhaps by reintroducing + // the blocking runtime abstraction and making it available to the entire program so that we + // don't need to be spawning multiple different runtimes. let rt = tokio::runtime::Runtime::new().unwrap(); let network = rt.block_on(async { network_config @@ -237,6 +256,7 @@ impl ZombieNode { .arg(u32::MAX.to_string()) .arg("--rpc-port") .arg(eth_rpc_port.to_string()) + .env("RUST_LOG", Self::PROXY_LOG_ENV) .stdout(stdout_file) .stderr(stderr_file); }, @@ -272,12 +292,13 @@ impl ZombieNode { template_chainspec_path: PathBuf, mut genesis: Genesis, ) -> anyhow::Result<()> { - let mut cmd: Command = std::process::Command::new(&self.polkadot_parachain_path); - cmd.arg(Self::EXPORT_CHAINSPEC_COMMAND) + let output = Command::new(self.polkadot_parachain_path.as_path()) + .arg(Self::EXPORT_CHAINSPEC_COMMAND) .arg("--chain") - .arg("asset-hub-westend-local"); - - let output = cmd.output().context("Failed to export the chain-spec")?; + .arg("asset-hub-westend-local") + .env_remove("RUST_LOG") + .output() + .context("Failed to export the chainspec of the chain")?; if !output.status.success() { anyhow::bail!( @@ -385,7 +406,7 @@ impl ZombieNode { .get_or_try_init(|| async move { construct_concurrency_limited_provider::( self.connection_string.as_str(), - FallbackGasFiller::new(250_000_000, 5_000_000_000, 1_000_000_000), + FallbackGasFiller::new(u64::MAX, 5_000_000_000, 1_000_000_000), ChainIdFiller::default(), // TODO: use CHAIN_ID constant NonceFiller::new(self.nonce_manager.clone()), self.wallet.clone(), @@ -398,7 +419,7 @@ impl ZombieNode { } } -impl EthereumNode for ZombieNode { +impl EthereumNode for ZombienetNode { fn pre_transactions(&mut self) -> Pin> + '_>> { Box::pin(async move { Ok(()) }) } @@ -448,17 +469,11 @@ impl EthereumNode for ZombieNode { transaction: alloy::rpc::types::TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let receipt = self + let provider = self .provider() .await - .context("Failed to create provider for transaction submission")? - .send_transaction(transaction) - .await - .context("Failed to submit transaction to proxy")? - .get_receipt() - .await - .context("Failed to fetch transaction receipt from proxy")?; - Ok(receipt) + .context("Failed to create the provider")?; + execute_transaction(provider, transaction).await }) } @@ -552,37 +567,46 @@ impl EthereumNode for ZombieNode { + '_, >, > { + fn create_stream( + provider: ConcreteProvider>, + ) -> impl Stream { + stream! { + let mut block_number = provider.get_block_number().await.expect("Failed to get the block number"); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await + else { + continue; + }; + + block_number += 1; + yield MinedBlockInformation { + block_number: block.number(), + block_timestamp: block.header.timestamp, + mined_gas: block.header.gas_used as _, + block_gas_limit: block.header.gas_limit, + transaction_hashes: block + .transactions + .into_hashes() + .as_hashes() + .expect("Must be hashes") + .to_vec(), + }; + }; + } + } + Box::pin(async move { let provider = self .provider() .await - .context("Failed to create the provider for block subscription")?; - let mut block_subscription = provider - .watch_full_blocks() - .await - .context("Failed to create the blocks stream")?; - block_subscription.set_channel_size(0xFFFF); - block_subscription.set_poll_interval(Duration::from_secs(1)); - let block_stream = block_subscription.into_stream(); + .context("Failed to create the provider for a block subscription")?; - let mined_block_information_stream = block_stream.filter_map(|block| async { - let block = block.ok()?; - Some(MinedBlockInformation { - block_number: block.number(), - block_timestamp: block.header.timestamp, - mined_gas: block.header.gas_used as _, - block_gas_limit: block.header.gas_limit, - transaction_hashes: block - .transactions - .into_hashes() - .as_hashes() - .expect("Must be hashes") - .to_vec(), - }) - }); + let stream = Box::pin(create_stream(provider)) + as Pin>>; - Ok(Box::pin(mined_block_information_stream) - as Pin>>) + Ok(stream) }) } } @@ -717,7 +741,7 @@ impl, P: Provider> ResolverApi } } -impl Node for ZombieNode { +impl Node for ZombienetNode { fn shutdown(&mut self) -> anyhow::Result<()> { // Kill the eth_rpc process drop(self.eth_rpc_process.take()); @@ -762,7 +786,7 @@ impl Node for ZombieNode { } } -impl Drop for ZombieNode { +impl Drop for ZombienetNode { fn drop(&mut self) { let _ = self.shutdown(); } @@ -786,9 +810,9 @@ mod tests { TestExecutionContext::default() } - pub async fn new_node() -> (TestExecutionContext, ZombieNode) { + pub async fn new_node() -> (TestExecutionContext, ZombienetNode) { let context = test_config(); - let mut node = ZombieNode::new( + let mut node = ZombienetNode::new( context.polkadot_parachain_configuration.path.clone(), &context, ); @@ -806,8 +830,9 @@ mod tests { (context, node) } - pub async fn shared_state() -> &'static (TestExecutionContext, Arc) { - static NODE: OnceCell<(TestExecutionContext, Arc)> = OnceCell::const_new(); + pub async fn shared_state() -> &'static (TestExecutionContext, Arc) { + static NODE: OnceCell<(TestExecutionContext, Arc)> = + OnceCell::const_new(); NODE.get_or_init(|| async { let (context, node) = new_node().await; @@ -816,13 +841,14 @@ mod tests { .await } - pub async fn shared_node() -> &'static Arc { + pub async fn shared_node() -> &'static Arc { &shared_state().await.1 } } use utils::{new_node, test_config}; #[tokio::test] + #[ignore = "Ignored for the time being"] async fn test_transfer_transaction_should_return_receipt() { let (ctx, node) = new_node().await; @@ -856,7 +882,7 @@ mod tests { "#; let context = test_config(); - let mut node = ZombieNode::new( + let mut node = ZombienetNode::new( context.polkadot_parachain_configuration.path.clone(), &context, ); @@ -866,17 +892,19 @@ mod tests { .expect("init failed"); // Check that the patched chainspec file was generated - let final_chainspec_path = node.base_directory.join(ZombieNode::CHAIN_SPEC_JSON_FILE); + let final_chainspec_path = node + .base_directory + .join(ZombienetNode::CHAIN_SPEC_JSON_FILE); assert!(final_chainspec_path.exists(), "Chainspec file should exist"); let contents = std::fs::read_to_string(&final_chainspec_path).expect("Failed to read chainspec"); // Validate that the Polkadot addresses derived from the Ethereum addresses are in the file - let first_eth_addr = ZombieNode::eth_to_polkadot_address( + let first_eth_addr = ZombienetNode::eth_to_polkadot_address( &"90F8bf6A479f320ead074411a4B0e7944Ea8c9C1".parse().unwrap(), ); - let second_eth_addr = ZombieNode::eth_to_polkadot_address( + let second_eth_addr = ZombienetNode::eth_to_polkadot_address( &"Ab8483F64d9C6d1EcF9b849Ae677dD3315835cb2".parse().unwrap(), ); @@ -904,7 +932,7 @@ mod tests { "#; let context = test_config(); - let node = ZombieNode::new( + let node = ZombienetNode::new( context.polkadot_parachain_configuration.path.clone(), &context, ); @@ -940,7 +968,7 @@ mod tests { ]; for eth_addr in eth_addresses { - let ss58 = ZombieNode::eth_to_polkadot_address(ð_addr.parse().unwrap()); + let ss58 = ZombienetNode::eth_to_polkadot_address(ð_addr.parse().unwrap()); println!("Ethereum: {eth_addr} -> Polkadot SS58: {ss58}"); } @@ -968,7 +996,7 @@ mod tests { ]; for (eth_addr, expected_ss58) in cases { - let result = ZombieNode::eth_to_polkadot_address(ð_addr.parse().unwrap()); + let result = ZombienetNode::eth_to_polkadot_address(ð_addr.parse().unwrap()); assert_eq!( result, expected_ss58, "Mismatch for Ethereum address {eth_addr}" @@ -980,7 +1008,7 @@ mod tests { fn eth_rpc_version_works() { // Arrange let context = test_config(); - let node = ZombieNode::new( + let node = ZombienetNode::new( context.polkadot_parachain_configuration.path.clone(), &context, ); @@ -999,7 +1027,7 @@ mod tests { fn version_works() { // Arrange let context = test_config(); - let node = ZombieNode::new( + let node = ZombienetNode::new( context.polkadot_parachain_configuration.path.clone(), &context, ); diff --git a/crates/node/src/provider_utils/fallback_gas_provider.rs b/crates/node/src/provider_utils/fallback_gas_filler.rs similarity index 74% rename from crates/node/src/provider_utils/fallback_gas_provider.rs rename to crates/node/src/provider_utils/fallback_gas_filler.rs index ff74ea2..c218316 100644 --- a/crates/node/src/provider_utils/fallback_gas_provider.rs +++ b/crates/node/src/provider_utils/fallback_gas_filler.rs @@ -7,6 +7,10 @@ use alloy::{ transports::TransportResult, }; +// Percentage padding applied to estimated gas (e.g. 120 = 20% padding) +const GAS_ESTIMATE_PADDING_NUMERATOR: u64 = 120; +const GAS_ESTIMATE_PADDING_DENOMINATOR: u64 = 100; + #[derive(Clone, Debug)] pub struct FallbackGasFiller { inner: GasFiller, @@ -56,8 +60,6 @@ where provider: &P, tx: &::TransactionRequest, ) -> TransportResult { - // Try to fetch GasFiller’s “fillable” (gas_price, base_fee, estimate_gas, …) - // If it errors (i.e. tx would revert under eth_estimateGas), swallow it. match self.inner.prepare(provider, tx).await { Ok(fill) => Ok(Some(fill)), Err(_) => Ok(None), @@ -70,8 +72,17 @@ where mut tx: alloy::providers::SendableTx, ) -> TransportResult> { if let Some(fill) = fillable { - // our inner GasFiller succeeded — use it - self.inner.fill(fill, tx).await + let mut tx = self.inner.fill(fill, tx).await?; + if let Some(builder) = tx.as_mut_builder() { + if let Some(estimated) = builder.gas_limit() { + let padded = estimated + .checked_mul(GAS_ESTIMATE_PADDING_NUMERATOR) + .and_then(|v| v.checked_div(GAS_ESTIMATE_PADDING_DENOMINATOR)) + .unwrap_or(u64::MAX); + builder.set_gas_limit(padded); + } + } + Ok(tx) } else { if let Some(builder) = tx.as_mut_builder() { builder.set_gas_limit(self.default_gas_limit); diff --git a/crates/node/src/provider_utils/mod.rs b/crates/node/src/provider_utils/mod.rs index b0738da..c556a71 100644 --- a/crates/node/src/provider_utils/mod.rs +++ b/crates/node/src/provider_utils/mod.rs @@ -1,7 +1,7 @@ mod concurrency_limiter; -mod fallback_gas_provider; +mod fallback_gas_filler; mod provider; pub use concurrency_limiter::*; -pub use fallback_gas_provider::*; +pub use fallback_gas_filler::*; pub use provider::*; diff --git a/crates/node/src/provider_utils/provider.rs b/crates/node/src/provider_utils/provider.rs index 6862537..f10b3b6 100644 --- a/crates/node/src/provider_utils/provider.rs +++ b/crates/node/src/provider_utils/provider.rs @@ -10,7 +10,7 @@ use alloy::{ }; use anyhow::{Context, Result}; use revive_dt_common::futures::{PollingWaitBehavior, poll}; -use tracing::debug; +use tracing::{Instrument, debug, info, info_span}; use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; @@ -44,7 +44,7 @@ where // requests at any point of time and no more than that. This is done in an effort to stabilize // the framework from some of the interment issues that we've been seeing related to RPC calls. static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock = - LazyLock::new(|| ConcurrencyLimiterLayer::new(10)); + LazyLock::new(|| ConcurrencyLimiterLayer::new(500)); let client = ClientBuilder::default() .layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone()) @@ -117,12 +117,16 @@ where async move { match provider.get_transaction_receipt(tx_hash).await { - Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), + Ok(Some(receipt)) => { + info!("Found the transaction receipt"); + Ok(ControlFlow::Break(receipt)) + } _ => Ok(ControlFlow::Continue(())), } } }, ) + .instrument(info_span!("Polling for receipt", %tx_hash)) .await .context(format!("Polling for receipt failed for {tx_hash}")) }