diff --git a/crates/core/src/differential_tests/driver.rs b/crates/core/src/differential_tests/driver.rs index deb6d15..d98c2ad 100644 --- a/crates/core/src/differential_tests/driver.rs +++ b/crates/core/src/differential_tests/driver.rs @@ -31,7 +31,7 @@ use revive_dt_format::{ traits::ResolutionContext, }; use tokio::sync::Mutex; -use tracing::{debug, error, info, instrument}; +use tracing::{error, info, instrument}; use crate::{ differential_tests::ExecutionState, @@ -109,7 +109,6 @@ impl<'a> Driver<'a, StepsIterator> { // endregion:Constructors // region:Execution - #[instrument(level = "info", skip_all)] pub async fn execute_all(mut self) -> Result { let platform_drivers = std::mem::take(&mut self.platform_drivers); let results = futures::future::try_join_all( @@ -218,8 +217,6 @@ where .flatten() .flat_map(|(_, map)| map.values()) { - debug!(%library_instance, "Deploying Library Instance"); - let ContractPathAndIdent { contract_source_path: library_source_path, contract_ident: library_ident, @@ -268,12 +265,6 @@ where ) })?; - debug!( - ?library_instance, - platform_identifier = %platform_information.platform.platform_identifier(), - "Deployed library" - ); - let library_address = receipt .contract_address .expect("Failed to deploy the library"); @@ -312,7 +303,6 @@ where // endregion:Constructors & Initialization // region:Step Handling - #[instrument(level = "info", skip_all)] pub async fn execute_all(mut self) -> Result { while let Some(result) = self.execute_next_step().await { result? @@ -320,14 +310,6 @@ where Ok(self.steps_executed) } - #[instrument( - level = "info", - skip_all, - fields( - platform_identifier = %self.platform_information.platform.platform_identifier(), - node_id = self.platform_information.node.id(), - ), - )] pub async fn execute_next_step(&mut self) -> Option> { let (step_path, step) = self.steps_iterator.next()?; info!(%step_path, "Executing Step"); @@ -344,6 +326,7 @@ where skip_all, fields( platform_identifier = %self.platform_information.platform.platform_identifier(), + node_id = self.platform_information.node.id(), %step_path, ), err(Debug), @@ -402,6 +385,7 @@ where Ok(1) } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_contract_deployment( &mut self, step: &FunctionCallStep, @@ -447,6 +431,7 @@ where Ok(receipts) } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_execution( &mut self, step: &FunctionCallStep, @@ -470,14 +455,12 @@ where } }; - match self.platform_information.node.execute_transaction(tx).await { - Ok(receipt) => Ok(receipt), - Err(err) => Err(err), - } + self.platform_information.node.execute_transaction(tx).await } } } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_call_frame_tracing( &mut self, tx_hash: TxHash, @@ -509,6 +492,7 @@ where }) } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_variable_assignment( &mut self, step: &FunctionCallStep, @@ -541,6 +525,7 @@ where Ok(()) } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_assertions( &mut self, step: &FunctionCallStep, @@ -583,6 +568,7 @@ where .await } + #[instrument(level = "debug", skip_all)] async fn handle_function_call_assertion_item( &self, receipt: &TransactionReceipt, @@ -865,7 +851,6 @@ where level = "info", skip_all, fields( - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), @@ -907,7 +892,6 @@ where level = "info", skip_all, fields( - platform_identifier = %self.platform_information.platform.platform_identifier(), %contract_instance, %deployer ), diff --git a/crates/core/src/differential_tests/entry_point.rs b/crates/core/src/differential_tests/entry_point.rs index 0fea540..a2b6c53 100644 --- a/crates/core/src/differential_tests/entry_point.rs +++ b/crates/core/src/differential_tests/entry_point.rs @@ -1,17 +1,17 @@ //! The main entry point into differential testing. use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, io::{BufWriter, Write, stderr}, sync::Arc, - time::Instant, + time::{Duration, Instant}, }; use anyhow::Context as _; use futures::{FutureExt, StreamExt}; use revive_dt_common::types::PrivateKeyAllocator; use revive_dt_core::Platform; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock, Semaphore}; use tracing::{Instrument, error, info, info_span, instrument}; use revive_dt_config::{Context, TestExecutionContext}; @@ -101,20 +101,40 @@ pub async fn handle_differential_tests( ))); // Creating the driver and executing all of the steps. - let driver_task = futures::future::join_all(test_definitions.iter().map(|test_definition| { - let private_key_allocator = private_key_allocator.clone(); - let cached_compiler = cached_compiler.clone(); - let mode = test_definition.mode.clone(); - let span = info_span!( - "Executing Test Case", - metadata_file_path = %test_definition.metadata_file_path.display(), - case_idx = %test_definition.case_idx, - mode = %mode - ); - async move { - let driver = - match Driver::new_root(test_definition, private_key_allocator, &cached_compiler) - .await + let semaphore = context + .concurrency_configuration + .concurrency_limit() + .map(Semaphore::new) + .map(Arc::new); + let running_task_list = Arc::new(RwLock::new(BTreeSet::::new())); + let driver_task = futures::future::join_all(test_definitions.iter().enumerate().map( + |(test_id, test_definition)| { + let running_task_list = running_task_list.clone(); + let semaphore = semaphore.clone(); + + let private_key_allocator = private_key_allocator.clone(); + let cached_compiler = cached_compiler.clone(); + let mode = test_definition.mode.clone(); + let span = info_span!( + "Executing Test Case", + test_id, + metadata_file_path = %test_definition.metadata_file_path.display(), + case_idx = %test_definition.case_idx, + mode = %mode, + ); + async move { + let permit = match semaphore.as_ref() { + Some(semaphore) => Some(semaphore.acquire().await.expect("Can't fail")), + None => None, + }; + + running_task_list.write().await.insert(test_id); + let driver = match Driver::new_root( + test_definition, + private_key_allocator, + &cached_compiler, + ) + .await { Ok(driver) => driver, Err(error) => { @@ -123,28 +143,33 @@ pub async fn handle_differential_tests( .report_test_failed_event(format!("{error:#}")) .expect("Can't fail"); error!("Test Case Failed"); + drop(permit); + running_task_list.write().await.remove(&test_id); return; } }; - info!("Created the driver for the test case"); + info!("Created the driver for the test case"); - match driver.execute_all().await { - Ok(steps_executed) => test_definition - .reporter - .report_test_succeeded_event(steps_executed) - .expect("Can't fail"), - Err(error) => { - test_definition + match driver.execute_all().await { + Ok(steps_executed) => test_definition .reporter - .report_test_failed_event(format!("{error:#}")) - .expect("Can't fail"); - error!("Test Case Failed"); - } - }; - info!("Finished the execution of the test case") - } - .instrument(span) - })) + .report_test_succeeded_event(steps_executed) + .expect("Can't fail"), + Err(error) => { + test_definition + .reporter + .report_test_failed_event(format!("{error:#}")) + .expect("Can't fail"); + error!("Test Case Failed"); + } + }; + info!("Finished the execution of the test case"); + drop(permit); + running_task_list.write().await.remove(&test_id); + } + .instrument(span) + }, + )) .inspect(|_| { info!("Finished executing all test cases"); reporter_clone @@ -153,6 +178,18 @@ pub async fn handle_differential_tests( }); let cli_reporting_task = start_cli_reporting_task(reporter); + tokio::task::spawn(async move { + loop { + let remaining_tasks = running_task_list.read().await; + info!( + count = remaining_tasks.len(), + ?remaining_tasks, + "Remaining Tests" + ); + tokio::time::sleep(Duration::from_secs(10)).await + } + }); + futures::future::join(driver_task, cli_reporting_task).await; Ok(()) diff --git a/crates/node/src/node_implementations/lighthouse_geth.rs b/crates/node/src/node_implementations/lighthouse_geth.rs index ab855f5..2c82ba4 100644 --- a/crates/node/src/node_implementations/lighthouse_geth.rs +++ b/crates/node/src/node_implementations/lighthouse_geth.rs @@ -17,7 +17,7 @@ use std::{ pin::Pin, process::{Command, Stdio}, sync::{ - Arc, LazyLock, + Arc, atomic::{AtomicU32, Ordering}, }, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -47,7 +47,7 @@ use futures::{Stream, StreamExt}; use revive_common::EVMVersion; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; -use tokio::sync::{OnceCell, Semaphore}; +use tokio::sync::OnceCell; use tracing::{Instrument, info, instrument}; use revive_dt_common::{ @@ -105,7 +105,6 @@ pub struct LighthouseGethNode { persistent_http_provider: OnceCell>>, persistent_ws_provider: OnceCell>>, - http_provider_requests_semaphore: LazyLock, } impl LighthouseGethNode { @@ -176,7 +175,6 @@ impl LighthouseGethNode { nonce_manager: Default::default(), persistent_http_provider: OnceCell::const_new(), persistent_ws_provider: OnceCell::const_new(), - http_provider_requests_semaphore: LazyLock::new(|| Semaphore::const_new(500)), } } @@ -566,8 +564,6 @@ impl EthereumNode for LighthouseGethNode { transaction: TransactionRequest, ) -> Pin> + '_>> { Box::pin(async move { - let _permit = self.http_provider_requests_semaphore.acquire().await; - let provider = self .http_provider() .await diff --git a/crates/node/src/node_implementations/substrate.rs b/crates/node/src/node_implementations/substrate.rs index fd746a1..a92b3da 100644 --- a/crates/node/src/node_implementations/substrate.rs +++ b/crates/node/src/node_implementations/substrate.rs @@ -54,7 +54,10 @@ use crate::{ Node, constants::{CHAIN_ID, INITIAL_BALANCE}, helpers::{Process, ProcessReadinessWaitBehavior}, - provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, + provider_utils::{ + ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider, + execute_transaction, + }, }; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -80,7 +83,7 @@ pub struct SubstrateNode { } impl SubstrateNode { - const BASE_DIRECTORY: &str = "Substrate"; + const BASE_DIRECTORY: &str = "substrate"; const LOGS_DIRECTORY: &str = "logs"; const DATA_DIRECTORY: &str = "chains"; @@ -346,7 +349,7 @@ impl SubstrateNode { .get_or_try_init(|| async move { construct_concurrency_limited_provider::( self.rpc_url.as_str(), - FallbackGasFiller::new(250_000_000, 5_000_000_000, 1_000_000_000), + FallbackGasFiller::new(u64::MAX, 5_000_000_000, 1_000_000_000), ChainIdFiller::new(Some(CHAIN_ID)), NonceFiller::new(self.nonce_manager.clone()), self.wallet.clone(), @@ -408,23 +411,12 @@ impl EthereumNode for SubstrateNode { &self, transaction: TransactionRequest, ) -> Pin> + '_>> { - static SEMAPHORE: std::sync::LazyLock = - std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(500)); - Box::pin(async move { - let _permit = SEMAPHORE.acquire().await?; - - let receipt = self + let provider = self .provider() .await - .context("Failed to create provider for transaction submission")? - .send_transaction(transaction) - .await - .context("Failed to submit transaction to substrate proxy")? - .get_receipt() - .await - .context("Failed to fetch transaction receipt from substrate proxy")?; - Ok(receipt) + .context("Failed to create the provider")?; + execute_transaction(provider, transaction).await }) } diff --git a/crates/node/src/provider_utils/provider.rs b/crates/node/src/provider_utils/provider.rs index e69ed20..6862537 100644 --- a/crates/node/src/provider_utils/provider.rs +++ b/crates/node/src/provider_utils/provider.rs @@ -1,14 +1,16 @@ -use std::sync::LazyLock; +use std::{ops::ControlFlow, sync::LazyLock, time::Duration}; use alloy::{ - network::{Network, NetworkWallet, TransactionBuilder4844}, + network::{Ethereum, Network, NetworkWallet, TransactionBuilder4844}, providers::{ - Identity, ProviderBuilder, RootProvider, + Identity, PendingTransactionBuilder, Provider, ProviderBuilder, RootProvider, fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller}, }, rpc::client::ClientBuilder, }; use anyhow::{Context, Result}; +use revive_dt_common::futures::{PollingWaitBehavior, poll}; +use tracing::debug; use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; @@ -61,3 +63,66 @@ where Ok(provider) } + +pub async fn execute_transaction( + provider: ConcreteProvider, + transaction: N::TransactionRequest, +) -> Result +where + N: Network< + TransactionRequest: TransactionBuilder4844, + TxEnvelope = ::TxEnvelope, + >, + W: NetworkWallet, + Identity: TxFiller, + FallbackGasFiller: TxFiller, + ChainIdFiller: TxFiller, + NonceFiller: TxFiller, + WalletFiller: TxFiller, +{ + let sendable_transaction = provider + .fill(transaction) + .await + .context("Failed to fill transaction")?; + + let transaction_envelope = sendable_transaction + .try_into_envelope() + .context("Failed to convert transaction into an envelope")?; + let tx_hash = *transaction_envelope.tx_hash(); + + let mut pending_transaction = match provider.send_tx_envelope(transaction_envelope).await { + Ok(pending_transaction) => pending_transaction, + Err(error) => { + let error_string = error.to_string(); + + if error_string.contains("Transaction Already Imported") { + PendingTransactionBuilder::::new(provider.root().clone(), tx_hash) + } else { + return Err(error).context(format!("Failed to submit transaction {tx_hash}")); + } + } + }; + debug!(%tx_hash, "Submitted Transaction"); + + pending_transaction.set_timeout(Some(Duration::from_secs(120))); + let tx_hash = pending_transaction.watch().await.context(format!( + "Transaction inclusion watching timeout for {tx_hash}" + ))?; + + poll( + Duration::from_secs(60), + PollingWaitBehavior::Constant(Duration::from_secs(3)), + || { + let provider = provider.clone(); + + async move { + match provider.get_transaction_receipt(tx_hash).await { + Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), + _ => Ok(ControlFlow::Continue(())), + } + } + }, + ) + .await + .context(format!("Polling for receipt failed for {tx_hash}")) +} diff --git a/run_tests.sh b/run_tests.sh index f569156..8f17191 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -76,8 +76,6 @@ cat > "$CORPUS_FILE" << EOF { "name": "MatterLabs Solidity Simple, Complex, and Semantic Tests", "paths": [ - "$(realpath "$TEST_REPO_DIR/fixtures/solidity/translated_semantic_tests")", - "$(realpath "$TEST_REPO_DIR/fixtures/solidity/complex")", "$(realpath "$TEST_REPO_DIR/fixtures/solidity/simple")" ] } @@ -95,11 +93,12 @@ echo "" # Run the tool cargo build --release; RUST_LOG="info,alloy_pubsub::service=error" ./target/release/retester test \ - --platform geth-evm-solc \ --platform revive-dev-node-revm-solc \ --corpus "$CORPUS_FILE" \ --working-directory "$WORKDIR" \ - --concurrency.number-of-nodes 5 \ + --concurrency.number-of-nodes 10 \ + --concurrency.number-of-threads 5 \ + --concurrency.number-of-concurrent-tasks 1000 \ --wallet.additional-keys 100000 \ --kitchensink.path "$SUBSTRATE_NODE_BIN" \ --revive-dev-node.path "$REVIVE_DEV_NODE_BIN" \