From 4b641b947b8c40e97ef4f7a34b1cda6fc73facb7 Mon Sep 17 00:00:00 2001 From: Omar Abdulla Date: Wed, 27 Aug 2025 18:22:56 +0300 Subject: [PATCH] Add leader and follower node assignment to test --- crates/core/src/main.rs | 97 ++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 49 deletions(-) diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 40d3203..4258af0 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -49,13 +49,15 @@ use crate::cached_compiler::CachedCompiler; static TEMP_DIR: LazyLock = LazyLock::new(|| TempDir::new().unwrap()); /// this represents a single "test"; a mode, path and collection of cases. -#[derive(Clone, Debug)] -struct Test<'a> { +#[derive(Clone)] +struct Test<'a, L: Platform, F: Platform> { metadata: &'a MetadataFile, metadata_file_path: &'a Path, mode: Mode, case_idx: CaseIdx, case: &'a Case, + leader_node: &'a ::Blockchain, + follower_node: &'a ::Blockchain, reporter: TestSpecificReporter, } @@ -185,7 +187,18 @@ where L::Blockchain: revive_dt_node::Node + Send + Sync + 'static, F::Blockchain: revive_dt_node::Node + Send + Sync + 'static, { - let tests = prepare_tests::(args, metadata_files, reporter.clone()); + let leader_nodes = + NodePool::::new(args).context("Failed to initialize leader node pool")?; + let follower_nodes = + NodePool::::new(args).context("Failed to initialize follower node pool")?; + + let tests = prepare_tests::( + args, + metadata_files, + &leader_nodes, + &follower_nodes, + reporter.clone(), + ); let driver_task = start_driver_task::(args, tests) .await .context("Failed to start driver task")?; @@ -200,8 +213,10 @@ where fn prepare_tests<'a, L, F>( args: &Arguments, metadata_files: &'a [MetadataFile], + leader_node_pool: &'a NodePool, + follower_node_pool: &'a NodePool, reporter: Reporter, -) -> impl Stream> +) -> impl Stream> where L: Platform, F: Platform, @@ -227,17 +242,21 @@ where .into_iter() .map(move |mode| (metadata_file, case_idx, case, mode)) }) - .map(move |(metadata_file, case_idx, case, mode)| Test { - metadata: metadata_file, - metadata_file_path: metadata_file.metadata_file_path.as_path(), - mode: mode.clone(), - case_idx: CaseIdx::new(case_idx), - case, - reporter: reporter.test_specific_reporter(Arc::new(TestSpecifier { - solc_mode: mode.clone(), - metadata_file_path: metadata_file.metadata_file_path.clone(), + .map(move |(metadata_file, case_idx, case, mode)| { + Test { + metadata: metadata_file, + metadata_file_path: metadata_file.metadata_file_path.as_path(), + mode: mode.clone(), case_idx: CaseIdx::new(case_idx), - })), + case, + leader_node: leader_node_pool.round_robbin(), + follower_node: follower_node_pool.round_robbin(), + reporter: reporter.test_specific_reporter(Arc::new(TestSpecifier { + solc_mode: mode.clone(), + metadata_file_path: metadata_file.metadata_file_path.clone(), + case_idx: CaseIdx::new(case_idx), + })), + } }) .inspect(|test| { test.reporter @@ -441,7 +460,7 @@ async fn does_compiler_support_mode( async fn start_driver_task<'a, L, F>( args: &Arguments, - tests: impl Stream>, + tests: impl Stream>, ) -> anyhow::Result> where L: Platform, @@ -451,12 +470,6 @@ where { info!("Starting driver task"); - let leader_nodes = Arc::new( - NodePool::::new(args).context("Failed to initialize leader node pool")?, - ); - let follower_nodes = Arc::new( - NodePool::::new(args).context("Failed to initialize follower node pool")?, - ); let number_concurrent_tasks = args.number_of_concurrent_tasks(); let cached_compiler = Arc::new( CachedCompiler::new( @@ -477,38 +490,26 @@ where // this number will automatically be low enough to address (2). The user can override this. Some(number_concurrent_tasks), move |test| { - let leader_nodes = leader_nodes.clone(); - let follower_nodes = follower_nodes.clone(); let cached_compiler = cached_compiler.clone(); async move { - let leader_node = leader_nodes.round_robbin(); - let follower_node = follower_nodes.round_robbin(); - test.reporter .report_leader_node_assigned_event( - leader_node.id(), + test.leader_node.id(), L::config_id(), - leader_node.connection_string(), + test.leader_node.connection_string(), ) .expect("Can't fail"); test.reporter .report_follower_node_assigned_event( - follower_node.id(), + test.follower_node.id(), F::config_id(), - follower_node.connection_string(), + test.follower_node.connection_string(), ) .expect("Can't fail"); let reporter = test.reporter.clone(); - let result = handle_case_driver::( - test, - args, - cached_compiler, - leader_node, - follower_node, - ) - .await; + let result = handle_case_driver::(test, args, cached_compiler).await; match result { Ok(steps_executed) => reporter @@ -615,16 +616,14 @@ async fn start_cli_reporting_task(reporter: Reporter) { mode = %test.mode, case_idx = %test.case_idx, case_name = test.case.name.as_deref().unwrap_or("Unnamed Case"), - leader_node = leader_node.id(), - follower_node = follower_node.id(), + leader_node = test.leader_node.id(), + follower_node = test.follower_node.id(), ) )] async fn handle_case_driver( - test: Test<'_>, + test: Test<'_, L, F>, config: &Arguments, cached_compiler: Arc, - leader_node: &L::Blockchain, - follower_node: &F::Blockchain, ) -> anyhow::Result where L: Platform, @@ -634,10 +633,10 @@ where { let leader_reporter = test .reporter - .execution_specific_reporter(leader_node.id(), NodeDesignation::Leader); + .execution_specific_reporter(test.leader_node.id(), NodeDesignation::Leader); let follower_reporter = test .reporter - .execution_specific_reporter(follower_node.id(), NodeDesignation::Follower); + .execution_specific_reporter(test.follower_node.id(), NodeDesignation::Follower); let ( ( @@ -780,8 +779,8 @@ where ); let (leader_receipt, follower_receipt) = try_join!( - leader_node.execute_transaction(leader_tx), - follower_node.execute_transaction(follower_tx) + test.leader_node.execute_transaction(leader_tx), + test.follower_node.execute_transaction(follower_tx) )?; debug!( @@ -927,8 +926,8 @@ where let mut driver = CaseDriver::::new( test.metadata, test.case, - leader_node, - follower_node, + test.leader_node, + test.follower_node, leader_state, follower_state, );