Fix the concurrency & the substrate gas limit fallback value

This commit is contained in:
Omar Abdulla
2025-10-07 05:07:35 +03:00
parent afd75ba535
commit 9a518a01fb
6 changed files with 160 additions and 84 deletions
+9 -25
View File
@@ -31,7 +31,7 @@ use revive_dt_format::{
traits::ResolutionContext,
};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument};
use tracing::{error, info, instrument};
use crate::{
differential_tests::ExecutionState,
@@ -109,7 +109,6 @@ impl<'a> Driver<'a, StepsIterator> {
// endregion:Constructors
// region:Execution
#[instrument(level = "info", skip_all)]
pub async fn execute_all(mut self) -> Result<usize> {
let platform_drivers = std::mem::take(&mut self.platform_drivers);
let results = futures::future::try_join_all(
@@ -218,8 +217,6 @@ where
.flatten()
.flat_map(|(_, map)| map.values())
{
debug!(%library_instance, "Deploying Library Instance");
let ContractPathAndIdent {
contract_source_path: library_source_path,
contract_ident: library_ident,
@@ -268,12 +265,6 @@ where
)
})?;
debug!(
?library_instance,
platform_identifier = %platform_information.platform.platform_identifier(),
"Deployed library"
);
let library_address = receipt
.contract_address
.expect("Failed to deploy the library");
@@ -312,7 +303,6 @@ where
// endregion:Constructors & Initialization
// region:Step Handling
#[instrument(level = "info", skip_all)]
pub async fn execute_all(mut self) -> Result<usize> {
while let Some(result) = self.execute_next_step().await {
result?
@@ -320,14 +310,6 @@ where
Ok(self.steps_executed)
}
#[instrument(
level = "info",
skip_all,
fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
node_id = self.platform_information.node.id(),
),
)]
pub async fn execute_next_step(&mut self) -> Option<Result<()>> {
let (step_path, step) = self.steps_iterator.next()?;
info!(%step_path, "Executing Step");
@@ -344,6 +326,7 @@ where
skip_all,
fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
node_id = self.platform_information.node.id(),
%step_path,
),
err(Debug),
@@ -402,6 +385,7 @@ where
Ok(1)
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_contract_deployment(
&mut self,
step: &FunctionCallStep,
@@ -447,6 +431,7 @@ where
Ok(receipts)
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_execution(
&mut self,
step: &FunctionCallStep,
@@ -470,14 +455,12 @@ where
}
};
match self.platform_information.node.execute_transaction(tx).await {
Ok(receipt) => Ok(receipt),
Err(err) => Err(err),
}
self.platform_information.node.execute_transaction(tx).await
}
}
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_call_frame_tracing(
&mut self,
tx_hash: TxHash,
@@ -509,6 +492,7 @@ where
})
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_variable_assignment(
&mut self,
step: &FunctionCallStep,
@@ -541,6 +525,7 @@ where
Ok(())
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_assertions(
&mut self,
step: &FunctionCallStep,
@@ -583,6 +568,7 @@ where
.await
}
#[instrument(level = "debug", skip_all)]
async fn handle_function_call_assertion_item(
&self,
receipt: &TransactionReceipt,
@@ -865,7 +851,6 @@ where
level = "info",
skip_all,
fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance,
%deployer
),
@@ -907,7 +892,6 @@ where
level = "info",
skip_all,
fields(
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance,
%deployer
),
@@ -1,17 +1,17 @@
//! The main entry point into differential testing.
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
io::{BufWriter, Write, stderr},
sync::Arc,
time::Instant,
time::{Duration, Instant},
};
use anyhow::Context as _;
use futures::{FutureExt, StreamExt};
use revive_dt_common::types::PrivateKeyAllocator;
use revive_dt_core::Platform;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock, Semaphore};
use tracing::{Instrument, error, info, info_span, instrument};
use revive_dt_config::{Context, TestExecutionContext};
@@ -101,20 +101,40 @@ pub async fn handle_differential_tests(
)));
// Creating the driver and executing all of the steps.
let driver_task = futures::future::join_all(test_definitions.iter().map(|test_definition| {
let private_key_allocator = private_key_allocator.clone();
let cached_compiler = cached_compiler.clone();
let mode = test_definition.mode.clone();
let span = info_span!(
"Executing Test Case",
metadata_file_path = %test_definition.metadata_file_path.display(),
case_idx = %test_definition.case_idx,
mode = %mode
);
async move {
let driver =
match Driver::new_root(test_definition, private_key_allocator, &cached_compiler)
.await
let semaphore = context
.concurrency_configuration
.concurrency_limit()
.map(Semaphore::new)
.map(Arc::new);
let running_task_list = Arc::new(RwLock::new(BTreeSet::<usize>::new()));
let driver_task = futures::future::join_all(test_definitions.iter().enumerate().map(
|(test_id, test_definition)| {
let running_task_list = running_task_list.clone();
let semaphore = semaphore.clone();
let private_key_allocator = private_key_allocator.clone();
let cached_compiler = cached_compiler.clone();
let mode = test_definition.mode.clone();
let span = info_span!(
"Executing Test Case",
test_id,
metadata_file_path = %test_definition.metadata_file_path.display(),
case_idx = %test_definition.case_idx,
mode = %mode,
);
async move {
let permit = match semaphore.as_ref() {
Some(semaphore) => Some(semaphore.acquire().await.expect("Can't fail")),
None => None,
};
running_task_list.write().await.insert(test_id);
let driver = match Driver::new_root(
test_definition,
private_key_allocator,
&cached_compiler,
)
.await
{
Ok(driver) => driver,
Err(error) => {
@@ -123,28 +143,33 @@ pub async fn handle_differential_tests(
.report_test_failed_event(format!("{error:#}"))
.expect("Can't fail");
error!("Test Case Failed");
drop(permit);
running_task_list.write().await.remove(&test_id);
return;
}
};
info!("Created the driver for the test case");
info!("Created the driver for the test case");
match driver.execute_all().await {
Ok(steps_executed) => test_definition
.reporter
.report_test_succeeded_event(steps_executed)
.expect("Can't fail"),
Err(error) => {
test_definition
match driver.execute_all().await {
Ok(steps_executed) => test_definition
.reporter
.report_test_failed_event(format!("{error:#}"))
.expect("Can't fail");
error!("Test Case Failed");
}
};
info!("Finished the execution of the test case")
}
.instrument(span)
}))
.report_test_succeeded_event(steps_executed)
.expect("Can't fail"),
Err(error) => {
test_definition
.reporter
.report_test_failed_event(format!("{error:#}"))
.expect("Can't fail");
error!("Test Case Failed");
}
};
info!("Finished the execution of the test case");
drop(permit);
running_task_list.write().await.remove(&test_id);
}
.instrument(span)
},
))
.inspect(|_| {
info!("Finished executing all test cases");
reporter_clone
@@ -153,6 +178,18 @@ pub async fn handle_differential_tests(
});
let cli_reporting_task = start_cli_reporting_task(reporter);
tokio::task::spawn(async move {
loop {
let remaining_tasks = running_task_list.read().await;
info!(
count = remaining_tasks.len(),
?remaining_tasks,
"Remaining Tests"
);
tokio::time::sleep(Duration::from_secs(10)).await
}
});
futures::future::join(driver_task, cli_reporting_task).await;
Ok(())