Compare commits

...

4 Commits

Author SHA1 Message Date
Omar Abdulla 9a518a01fb Fix the concurrency & the substrate gas limit fallback value 2025-10-07 05:07:35 +03:00
Omar Abdulla afd75ba535 Fix the fallback gas for substrate chains 2025-10-07 00:08:32 +03:00
Omar Abdulla 983ee7f355 Merge remote-tracking branch 'origin/main' into refactor/update-compiler-semaphore 2025-10-06 01:27:35 +03:00
Omar Abdulla bd983a0919 Update compiler semaphore 2025-10-06 01:26:39 +03:00
7 changed files with 163 additions and 90 deletions
+9 -25
View File
@@ -31,7 +31,7 @@ use revive_dt_format::{
traits::ResolutionContext, traits::ResolutionContext,
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument}; use tracing::{error, info, instrument};
use crate::{ use crate::{
differential_tests::ExecutionState, differential_tests::ExecutionState,
@@ -109,7 +109,6 @@ impl<'a> Driver<'a, StepsIterator> {
// endregion:Constructors // endregion:Constructors
// region:Execution // region:Execution
#[instrument(level = "info", skip_all)]
pub async fn execute_all(mut self) -> Result<usize> { pub async fn execute_all(mut self) -> Result<usize> {
let platform_drivers = std::mem::take(&mut self.platform_drivers); let platform_drivers = std::mem::take(&mut self.platform_drivers);
let results = futures::future::try_join_all( let results = futures::future::try_join_all(
@@ -218,8 +217,6 @@ where
.flatten() .flatten()
.flat_map(|(_, map)| map.values()) .flat_map(|(_, map)| map.values())
{ {
debug!(%library_instance, "Deploying Library Instance");
let ContractPathAndIdent { let ContractPathAndIdent {
contract_source_path: library_source_path, contract_source_path: library_source_path,
contract_ident: library_ident, contract_ident: library_ident,
@@ -268,12 +265,6 @@ where
) )
})?; })?;
debug!(
?library_instance,
platform_identifier = %platform_information.platform.platform_identifier(),
"Deployed library"
);
let library_address = receipt let library_address = receipt
.contract_address .contract_address
.expect("Failed to deploy the library"); .expect("Failed to deploy the library");
@@ -312,7 +303,6 @@ where
// endregion:Constructors & Initialization // endregion:Constructors & Initialization
// region:Step Handling // region:Step Handling
#[instrument(level = "info", skip_all)]
pub async fn execute_all(mut self) -> Result<usize> { pub async fn execute_all(mut self) -> Result<usize> {
while let Some(result) = self.execute_next_step().await { while let Some(result) = self.execute_next_step().await {
result? result?
@@ -320,14 +310,6 @@ where
Ok(self.steps_executed) Ok(self.steps_executed)
} }
#[instrument(
level = "info",
skip_all,
fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
node_id = self.platform_information.node.id(),
),
)]
pub async fn execute_next_step(&mut self) -> Option<Result<()>> { pub async fn execute_next_step(&mut self) -> Option<Result<()>> {
let (step_path, step) = self.steps_iterator.next()?; let (step_path, step) = self.steps_iterator.next()?;
info!(%step_path, "Executing Step"); info!(%step_path, "Executing Step");
@@ -344,6 +326,7 @@ where
skip_all, skip_all,
fields( fields(
platform_identifier = %self.platform_information.platform.platform_identifier(), platform_identifier = %self.platform_information.platform.platform_identifier(),
node_id = self.platform_information.node.id(),
%step_path, %step_path,
), ),
err(Debug), err(Debug),
@@ -402,6 +385,7 @@ where
Ok(1) Ok(1)
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_contract_deployment( async fn handle_function_call_contract_deployment(
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
@@ -447,6 +431,7 @@ where
Ok(receipts) Ok(receipts)
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_execution( async fn handle_function_call_execution(
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
@@ -470,14 +455,12 @@ where
} }
}; };
match self.platform_information.node.execute_transaction(tx).await { self.platform_information.node.execute_transaction(tx).await
Ok(receipt) => Ok(receipt),
Err(err) => Err(err),
}
} }
} }
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_call_frame_tracing( async fn handle_function_call_call_frame_tracing(
&mut self, &mut self,
tx_hash: TxHash, tx_hash: TxHash,
@@ -509,6 +492,7 @@ where
}) })
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_variable_assignment( async fn handle_function_call_variable_assignment(
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
@@ -541,6 +525,7 @@ where
Ok(()) Ok(())
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_assertions( async fn handle_function_call_assertions(
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
@@ -583,6 +568,7 @@ where
.await .await
} }
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_assertion_item( async fn handle_function_call_assertion_item(
&self, &self,
receipt: &TransactionReceipt, receipt: &TransactionReceipt,
@@ -865,7 +851,6 @@ where
level = "info", level = "info",
skip_all, skip_all,
fields( fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance, %contract_instance,
%deployer %deployer
), ),
@@ -907,7 +892,6 @@ where
level = "info", level = "info",
skip_all, skip_all,
fields( fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance, %contract_instance,
%deployer %deployer
), ),
@@ -1,17 +1,17 @@
//! The main entry point into differential testing. //! The main entry point into differential testing.
use std::{ use std::{
collections::BTreeMap, collections::{BTreeMap, BTreeSet},
io::{BufWriter, Write, stderr}, io::{BufWriter, Write, stderr},
sync::Arc, sync::Arc,
time::Instant, time::{Duration, Instant},
}; };
use anyhow::Context as _; use anyhow::Context as _;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use revive_dt_common::types::PrivateKeyAllocator; use revive_dt_common::types::PrivateKeyAllocator;
use revive_dt_core::Platform; use revive_dt_core::Platform;
use tokio::sync::Mutex; use tokio::sync::{Mutex, RwLock, Semaphore};
use tracing::{Instrument, error, info, info_span, instrument}; use tracing::{Instrument, error, info, info_span, instrument};
use revive_dt_config::{Context, TestExecutionContext}; use revive_dt_config::{Context, TestExecutionContext};
@@ -101,19 +101,39 @@ pub async fn handle_differential_tests(
))); )));
// Creating the driver and executing all of the steps. // Creating the driver and executing all of the steps.
let driver_task = futures::future::join_all(test_definitions.iter().map(|test_definition| { let semaphore = context
.concurrency_configuration
.concurrency_limit()
.map(Semaphore::new)
.map(Arc::new);
let running_task_list = Arc::new(RwLock::new(BTreeSet::<usize>::new()));
let driver_task = futures::future::join_all(test_definitions.iter().enumerate().map(
|(test_id, test_definition)| {
let running_task_list = running_task_list.clone();
let semaphore = semaphore.clone();
let private_key_allocator = private_key_allocator.clone(); let private_key_allocator = private_key_allocator.clone();
let cached_compiler = cached_compiler.clone(); let cached_compiler = cached_compiler.clone();
let mode = test_definition.mode.clone(); let mode = test_definition.mode.clone();
let span = info_span!( let span = info_span!(
"Executing Test Case", "Executing Test Case",
test_id,
metadata_file_path = %test_definition.metadata_file_path.display(), metadata_file_path = %test_definition.metadata_file_path.display(),
case_idx = %test_definition.case_idx, case_idx = %test_definition.case_idx,
mode = %mode mode = %mode,
); );
async move { async move {
let driver = let permit = match semaphore.as_ref() {
match Driver::new_root(test_definition, private_key_allocator, &cached_compiler) Some(semaphore) => Some(semaphore.acquire().await.expect("Can't fail")),
None => None,
};
running_task_list.write().await.insert(test_id);
let driver = match Driver::new_root(
test_definition,
private_key_allocator,
&cached_compiler,
)
.await .await
{ {
Ok(driver) => driver, Ok(driver) => driver,
@@ -123,6 +143,8 @@ pub async fn handle_differential_tests(
.report_test_failed_event(format!("{error:#}")) .report_test_failed_event(format!("{error:#}"))
.expect("Can't fail"); .expect("Can't fail");
error!("Test Case Failed"); error!("Test Case Failed");
drop(permit);
running_task_list.write().await.remove(&test_id);
return; return;
} }
}; };
@@ -141,10 +163,13 @@ pub async fn handle_differential_tests(
error!("Test Case Failed"); error!("Test Case Failed");
} }
}; };
info!("Finished the execution of the test case") info!("Finished the execution of the test case");
drop(permit);
running_task_list.write().await.remove(&test_id);
} }
.instrument(span) .instrument(span)
})) },
))
.inspect(|_| { .inspect(|_| {
info!("Finished executing all test cases"); info!("Finished executing all test cases");
reporter_clone reporter_clone
@@ -153,6 +178,18 @@ pub async fn handle_differential_tests(
}); });
let cli_reporting_task = start_cli_reporting_task(reporter); let cli_reporting_task = start_cli_reporting_task(reporter);
tokio::task::spawn(async move {
loop {
let remaining_tasks = running_task_list.read().await;
info!(
count = remaining_tasks.len(),
?remaining_tasks,
"Remaining Tests"
);
tokio::time::sleep(Duration::from_secs(10)).await
}
});
futures::future::join(driver_task, cli_reporting_task).await; futures::future::join(driver_task, cli_reporting_task).await;
Ok(()) Ok(())
+1 -1
View File
@@ -201,7 +201,7 @@ async fn compile_contracts(
// Puts a limit on how many compilations we can perform at any given instance which helps us // Puts a limit on how many compilations we can perform at any given instance which helps us
// with some of the errors we've been seeing with high concurrency on MacOS (we have not tried // with some of the errors we've been seeing with high concurrency on MacOS (we have not tried
// it on Linux so we don't know if these issues also persist there or not.) // it on Linux so we don't know if these issues also persist there or not.)
static SPAWN_GATE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(100)); static SPAWN_GATE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(5));
let _permit = SPAWN_GATE.acquire().await?; let _permit = SPAWN_GATE.acquire().await?;
let all_sources_in_dir = FilesWithExtensionIterator::new(metadata_directory.as_ref()) let all_sources_in_dir = FilesWithExtensionIterator::new(metadata_directory.as_ref())
@@ -17,7 +17,7 @@ use std::{
pin::Pin, pin::Pin,
process::{Command, Stdio}, process::{Command, Stdio},
sync::{ sync::{
Arc, LazyLock, Arc,
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
}, },
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
@@ -47,7 +47,7 @@ use futures::{Stream, StreamExt};
use revive_common::EVMVersion; use revive_common::EVMVersion;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as; use serde_with::serde_as;
use tokio::sync::{OnceCell, Semaphore}; use tokio::sync::OnceCell;
use tracing::{Instrument, info, instrument}; use tracing::{Instrument, info, instrument};
use revive_dt_common::{ use revive_dt_common::{
@@ -105,7 +105,6 @@ pub struct LighthouseGethNode {
persistent_http_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>, persistent_http_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>,
persistent_ws_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>, persistent_ws_provider: OnceCell<ConcreteProvider<Ethereum, Arc<EthereumWallet>>>,
http_provider_requests_semaphore: LazyLock<Semaphore>,
} }
impl LighthouseGethNode { impl LighthouseGethNode {
@@ -176,7 +175,6 @@ impl LighthouseGethNode {
nonce_manager: Default::default(), nonce_manager: Default::default(),
persistent_http_provider: OnceCell::const_new(), persistent_http_provider: OnceCell::const_new(),
persistent_ws_provider: OnceCell::const_new(), persistent_ws_provider: OnceCell::const_new(),
http_provider_requests_semaphore: LazyLock::new(|| Semaphore::const_new(500)),
} }
} }
@@ -566,8 +564,6 @@ impl EthereumNode for LighthouseGethNode {
transaction: TransactionRequest, transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TxHash>> + '_>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<TxHash>> + '_>> {
Box::pin(async move { Box::pin(async move {
let _permit = self.http_provider_requests_semaphore.acquire().await;
let provider = self let provider = self
.http_provider() .http_provider()
.await .await
@@ -54,7 +54,10 @@ use crate::{
Node, Node,
constants::{CHAIN_ID, INITIAL_BALANCE}, constants::{CHAIN_ID, INITIAL_BALANCE},
helpers::{Process, ProcessReadinessWaitBehavior}, helpers::{Process, ProcessReadinessWaitBehavior},
provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, provider_utils::{
ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider,
execute_transaction,
},
}; };
static NODE_COUNT: AtomicU32 = AtomicU32::new(0); static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -80,7 +83,7 @@ pub struct SubstrateNode {
} }
impl SubstrateNode { impl SubstrateNode {
const BASE_DIRECTORY: &str = "Substrate"; const BASE_DIRECTORY: &str = "substrate";
const LOGS_DIRECTORY: &str = "logs"; const LOGS_DIRECTORY: &str = "logs";
const DATA_DIRECTORY: &str = "chains"; const DATA_DIRECTORY: &str = "chains";
@@ -346,7 +349,7 @@ impl SubstrateNode {
.get_or_try_init(|| async move { .get_or_try_init(|| async move {
construct_concurrency_limited_provider::<ReviveNetwork, _>( construct_concurrency_limited_provider::<ReviveNetwork, _>(
self.rpc_url.as_str(), self.rpc_url.as_str(),
FallbackGasFiller::new(250_000_000, 5_000_000_000, 1_000_000_000), FallbackGasFiller::new(u64::MAX, 5_000_000_000, 1_000_000_000),
ChainIdFiller::new(Some(CHAIN_ID)), ChainIdFiller::new(Some(CHAIN_ID)),
NonceFiller::new(self.nonce_manager.clone()), NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(), self.wallet.clone(),
@@ -408,23 +411,12 @@ impl EthereumNode for SubstrateNode {
&self, &self,
transaction: TransactionRequest, transaction: TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
static SEMAPHORE: std::sync::LazyLock<tokio::sync::Semaphore> =
std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(500));
Box::pin(async move { Box::pin(async move {
let _permit = SEMAPHORE.acquire().await?; let provider = self
let receipt = self
.provider() .provider()
.await .await
.context("Failed to create provider for transaction submission")? .context("Failed to create the provider")?;
.send_transaction(transaction) execute_transaction(provider, transaction).await
.await
.context("Failed to submit transaction to substrate proxy")?
.get_receipt()
.await
.context("Failed to fetch transaction receipt from substrate proxy")?;
Ok(receipt)
}) })
} }
+68 -3
View File
@@ -1,14 +1,16 @@
use std::sync::LazyLock; use std::{ops::ControlFlow, sync::LazyLock, time::Duration};
use alloy::{ use alloy::{
network::{Network, NetworkWallet, TransactionBuilder4844}, network::{Ethereum, Network, NetworkWallet, TransactionBuilder4844},
providers::{ providers::{
Identity, ProviderBuilder, RootProvider, Identity, PendingTransactionBuilder, Provider, ProviderBuilder, RootProvider,
fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller}, fillers::{ChainIdFiller, FillProvider, JoinFill, NonceFiller, TxFiller, WalletFiller},
}, },
rpc::client::ClientBuilder, rpc::client::ClientBuilder,
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use revive_dt_common::futures::{PollingWaitBehavior, poll};
use tracing::debug;
use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller};
@@ -61,3 +63,66 @@ where
Ok(provider) Ok(provider)
} }
pub async fn execute_transaction<N, W>(
provider: ConcreteProvider<N, W>,
transaction: N::TransactionRequest,
) -> Result<N::ReceiptResponse>
where
N: Network<
TransactionRequest: TransactionBuilder4844,
TxEnvelope = <Ethereum as Network>::TxEnvelope,
>,
W: NetworkWallet<N>,
Identity: TxFiller<N>,
FallbackGasFiller: TxFiller<N>,
ChainIdFiller: TxFiller<N>,
NonceFiller: TxFiller<N>,
WalletFiller<W>: TxFiller<N>,
{
let sendable_transaction = provider
.fill(transaction)
.await
.context("Failed to fill transaction")?;
let transaction_envelope = sendable_transaction
.try_into_envelope()
.context("Failed to convert transaction into an envelope")?;
let tx_hash = *transaction_envelope.tx_hash();
let mut pending_transaction = match provider.send_tx_envelope(transaction_envelope).await {
Ok(pending_transaction) => pending_transaction,
Err(error) => {
let error_string = error.to_string();
if error_string.contains("Transaction Already Imported") {
PendingTransactionBuilder::<N>::new(provider.root().clone(), tx_hash)
} else {
return Err(error).context(format!("Failed to submit transaction {tx_hash}"));
}
}
};
debug!(%tx_hash, "Submitted Transaction");
pending_transaction.set_timeout(Some(Duration::from_secs(120)));
let tx_hash = pending_transaction.watch().await.context(format!(
"Transaction inclusion watching timeout for {tx_hash}"
))?;
poll(
Duration::from_secs(60),
PollingWaitBehavior::Constant(Duration::from_secs(3)),
|| {
let provider = provider.clone();
async move {
match provider.get_transaction_receipt(tx_hash).await {
Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)),
_ => Ok(ControlFlow::Continue(())),
}
}
},
)
.await
.context(format!("Polling for receipt failed for {tx_hash}"))
}
+3 -4
View File
@@ -76,8 +76,6 @@ cat > "$CORPUS_FILE" << EOF
{ {
"name": "MatterLabs Solidity Simple, Complex, and Semantic Tests", "name": "MatterLabs Solidity Simple, Complex, and Semantic Tests",
"paths": [ "paths": [
"$(realpath "$TEST_REPO_DIR/fixtures/solidity/translated_semantic_tests")",
"$(realpath "$TEST_REPO_DIR/fixtures/solidity/complex")",
"$(realpath "$TEST_REPO_DIR/fixtures/solidity/simple")" "$(realpath "$TEST_REPO_DIR/fixtures/solidity/simple")"
] ]
} }
@@ -95,11 +93,12 @@ echo ""
# Run the tool # Run the tool
cargo build --release; cargo build --release;
RUST_LOG="info,alloy_pubsub::service=error" ./target/release/retester test \ RUST_LOG="info,alloy_pubsub::service=error" ./target/release/retester test \
--platform geth-evm-solc \
--platform revive-dev-node-revm-solc \ --platform revive-dev-node-revm-solc \
--corpus "$CORPUS_FILE" \ --corpus "$CORPUS_FILE" \
--working-directory "$WORKDIR" \ --working-directory "$WORKDIR" \
--concurrency.number-of-nodes 5 \ --concurrency.number-of-nodes 10 \
--concurrency.number-of-threads 5 \
--concurrency.number-of-concurrent-tasks 1000 \
--wallet.additional-keys 100000 \ --wallet.additional-keys 100000 \
--kitchensink.path "$SUBSTRATE_NODE_BIN" \ --kitchensink.path "$SUBSTRATE_NODE_BIN" \
--revive-dev-node.path "$REVIVE_DEV_NODE_BIN" \ --revive-dev-node.path "$REVIVE_DEV_NODE_BIN" \