Add leader and follower node assignment to test

This commit is contained in:
Omar Abdulla
2025-08-27 18:22:56 +03:00
parent 8b1afc36a3
commit 4b641b947b
+48 -49
View File
@@ -49,13 +49,15 @@ use crate::cached_compiler::CachedCompiler;
static TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap()); static TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
/// this represents a single "test"; a mode, path and collection of cases. /// this represents a single "test"; a mode, path and collection of cases.
#[derive(Clone, Debug)] #[derive(Clone)]
struct Test<'a> { struct Test<'a, L: Platform, F: Platform> {
metadata: &'a MetadataFile, metadata: &'a MetadataFile,
metadata_file_path: &'a Path, metadata_file_path: &'a Path,
mode: Mode, mode: Mode,
case_idx: CaseIdx, case_idx: CaseIdx,
case: &'a Case, case: &'a Case,
leader_node: &'a <L as Platform>::Blockchain,
follower_node: &'a <F as Platform>::Blockchain,
reporter: TestSpecificReporter, reporter: TestSpecificReporter,
} }
@@ -185,7 +187,18 @@ where
L::Blockchain: revive_dt_node::Node + Send + Sync + 'static, L::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
F::Blockchain: revive_dt_node::Node + Send + Sync + 'static, F::Blockchain: revive_dt_node::Node + Send + Sync + 'static,
{ {
let tests = prepare_tests::<L, F>(args, metadata_files, reporter.clone()); let leader_nodes =
NodePool::<L::Blockchain>::new(args).context("Failed to initialize leader node pool")?;
let follower_nodes =
NodePool::<F::Blockchain>::new(args).context("Failed to initialize follower node pool")?;
let tests = prepare_tests::<L, F>(
args,
metadata_files,
&leader_nodes,
&follower_nodes,
reporter.clone(),
);
let driver_task = start_driver_task::<L, F>(args, tests) let driver_task = start_driver_task::<L, F>(args, tests)
.await .await
.context("Failed to start driver task")?; .context("Failed to start driver task")?;
@@ -200,8 +213,10 @@ where
fn prepare_tests<'a, L, F>( fn prepare_tests<'a, L, F>(
args: &Arguments, args: &Arguments,
metadata_files: &'a [MetadataFile], metadata_files: &'a [MetadataFile],
leader_node_pool: &'a NodePool<L::Blockchain>,
follower_node_pool: &'a NodePool<F::Blockchain>,
reporter: Reporter, reporter: Reporter,
) -> impl Stream<Item = Test<'a>> ) -> impl Stream<Item = Test<'a, L, F>>
where where
L: Platform, L: Platform,
F: Platform, F: Platform,
@@ -227,17 +242,21 @@ where
.into_iter() .into_iter()
.map(move |mode| (metadata_file, case_idx, case, mode)) .map(move |mode| (metadata_file, case_idx, case, mode))
}) })
.map(move |(metadata_file, case_idx, case, mode)| Test { .map(move |(metadata_file, case_idx, case, mode)| {
metadata: metadata_file, Test {
metadata_file_path: metadata_file.metadata_file_path.as_path(), metadata: metadata_file,
mode: mode.clone(), metadata_file_path: metadata_file.metadata_file_path.as_path(),
case_idx: CaseIdx::new(case_idx), mode: mode.clone(),
case,
reporter: reporter.test_specific_reporter(Arc::new(TestSpecifier {
solc_mode: mode.clone(),
metadata_file_path: metadata_file.metadata_file_path.clone(),
case_idx: CaseIdx::new(case_idx), case_idx: CaseIdx::new(case_idx),
})), case,
leader_node: leader_node_pool.round_robbin(),
follower_node: follower_node_pool.round_robbin(),
reporter: reporter.test_specific_reporter(Arc::new(TestSpecifier {
solc_mode: mode.clone(),
metadata_file_path: metadata_file.metadata_file_path.clone(),
case_idx: CaseIdx::new(case_idx),
})),
}
}) })
.inspect(|test| { .inspect(|test| {
test.reporter test.reporter
@@ -441,7 +460,7 @@ async fn does_compiler_support_mode<P: Platform>(
async fn start_driver_task<'a, L, F>( async fn start_driver_task<'a, L, F>(
args: &Arguments, args: &Arguments,
tests: impl Stream<Item = Test<'a>>, tests: impl Stream<Item = Test<'a, L, F>>,
) -> anyhow::Result<impl Future<Output = ()>> ) -> anyhow::Result<impl Future<Output = ()>>
where where
L: Platform, L: Platform,
@@ -451,12 +470,6 @@ where
{ {
info!("Starting driver task"); info!("Starting driver task");
let leader_nodes = Arc::new(
NodePool::<L::Blockchain>::new(args).context("Failed to initialize leader node pool")?,
);
let follower_nodes = Arc::new(
NodePool::<F::Blockchain>::new(args).context("Failed to initialize follower node pool")?,
);
let number_concurrent_tasks = args.number_of_concurrent_tasks(); let number_concurrent_tasks = args.number_of_concurrent_tasks();
let cached_compiler = Arc::new( let cached_compiler = Arc::new(
CachedCompiler::new( CachedCompiler::new(
@@ -477,38 +490,26 @@ where
// this number will automatically be low enough to address (2). The user can override this. // this number will automatically be low enough to address (2). The user can override this.
Some(number_concurrent_tasks), Some(number_concurrent_tasks),
move |test| { move |test| {
let leader_nodes = leader_nodes.clone();
let follower_nodes = follower_nodes.clone();
let cached_compiler = cached_compiler.clone(); let cached_compiler = cached_compiler.clone();
async move { async move {
let leader_node = leader_nodes.round_robbin();
let follower_node = follower_nodes.round_robbin();
test.reporter test.reporter
.report_leader_node_assigned_event( .report_leader_node_assigned_event(
leader_node.id(), test.leader_node.id(),
L::config_id(), L::config_id(),
leader_node.connection_string(), test.leader_node.connection_string(),
) )
.expect("Can't fail"); .expect("Can't fail");
test.reporter test.reporter
.report_follower_node_assigned_event( .report_follower_node_assigned_event(
follower_node.id(), test.follower_node.id(),
F::config_id(), F::config_id(),
follower_node.connection_string(), test.follower_node.connection_string(),
) )
.expect("Can't fail"); .expect("Can't fail");
let reporter = test.reporter.clone(); let reporter = test.reporter.clone();
let result = handle_case_driver::<L, F>( let result = handle_case_driver::<L, F>(test, args, cached_compiler).await;
test,
args,
cached_compiler,
leader_node,
follower_node,
)
.await;
match result { match result {
Ok(steps_executed) => reporter Ok(steps_executed) => reporter
@@ -615,16 +616,14 @@ async fn start_cli_reporting_task(reporter: Reporter) {
mode = %test.mode, mode = %test.mode,
case_idx = %test.case_idx, case_idx = %test.case_idx,
case_name = test.case.name.as_deref().unwrap_or("Unnamed Case"), case_name = test.case.name.as_deref().unwrap_or("Unnamed Case"),
leader_node = leader_node.id(), leader_node = test.leader_node.id(),
follower_node = follower_node.id(), follower_node = test.follower_node.id(),
) )
)] )]
async fn handle_case_driver<L, F>( async fn handle_case_driver<L, F>(
test: Test<'_>, test: Test<'_, L, F>,
config: &Arguments, config: &Arguments,
cached_compiler: Arc<CachedCompiler>, cached_compiler: Arc<CachedCompiler>,
leader_node: &L::Blockchain,
follower_node: &F::Blockchain,
) -> anyhow::Result<usize> ) -> anyhow::Result<usize>
where where
L: Platform, L: Platform,
@@ -634,10 +633,10 @@ where
{ {
let leader_reporter = test let leader_reporter = test
.reporter .reporter
.execution_specific_reporter(leader_node.id(), NodeDesignation::Leader); .execution_specific_reporter(test.leader_node.id(), NodeDesignation::Leader);
let follower_reporter = test let follower_reporter = test
.reporter .reporter
.execution_specific_reporter(follower_node.id(), NodeDesignation::Follower); .execution_specific_reporter(test.follower_node.id(), NodeDesignation::Follower);
let ( let (
( (
@@ -780,8 +779,8 @@ where
); );
let (leader_receipt, follower_receipt) = try_join!( let (leader_receipt, follower_receipt) = try_join!(
leader_node.execute_transaction(leader_tx), test.leader_node.execute_transaction(leader_tx),
follower_node.execute_transaction(follower_tx) test.follower_node.execute_transaction(follower_tx)
)?; )?;
debug!( debug!(
@@ -927,8 +926,8 @@ where
let mut driver = CaseDriver::<L, F>::new( let mut driver = CaseDriver::<L, F>::new(
test.metadata, test.metadata,
test.case, test.case,
leader_node, test.leader_node,
follower_node, test.follower_node,
leader_state, leader_state,
follower_state, follower_state,
); );