diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 3ec83f8a33..78c080f367 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -49,9 +49,7 @@ const TEST_CONFIG: Config = Config { col_meta: columns::META, }; -struct TestHarness { - virtual_overseer: test_helpers::TestSubsystemContextHandle, -} +type VirtualOverseer = test_helpers::TestSubsystemContextHandle; #[derive(Default)] struct TestCandidateBuilder { @@ -140,10 +138,10 @@ impl Default for TestState { } -fn test_harness>( +fn test_harness>( state: TestState, store: Arc, - test: impl FnOnce(TestHarness) -> T, + test: impl FnOnce(VirtualOverseer) -> T, ) { let _ = env_logger::builder() .is_test(true) @@ -170,20 +168,24 @@ fn test_harness>( let subsystem = run(subsystem, context); - let test_fut = test(TestHarness { - virtual_overseer, - }); + let test_fut = test(virtual_overseer); futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer_signal( + &mut overseer, + OverseerSignal::Conclude, + ).await; + }, subsystem)); } const TIMEOUT: Duration = Duration::from_millis(100); async fn overseer_send( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, msg: AvailabilityStoreMessage, ) { tracing::trace!(meg = ?msg, "sending message"); @@ -195,7 +197,7 @@ async fn overseer_send( } async fn overseer_recv( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, ) -> AllMessages { let msg = overseer_recv_with_timeout(overseer, TIMEOUT) .await @@ -207,7 +209,7 @@ async fn overseer_recv( } async fn overseer_recv_with_timeout( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, timeout: Duration, ) -> Option { tracing::trace!("waiting for message..."); @@ -218,7 +220,7 @@ async fn overseer_recv_with_timeout( } async fn overseer_signal( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, signal: OverseerSignal, ) { overseer @@ -247,8 +249,7 @@ fn candidate_included(receipt: CandidateReceipt) -> CandidateEvent { fn runtime_api_error_does_not_stop_the_subsystem() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(TestState::default(), store, |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(TestState::default(), store, |mut virtual_overseer| async move { let new_leaf = Hash::repeat_byte(0x01); overseer_signal( @@ -288,15 +289,14 @@ fn runtime_api_error_does_not_stop_the_subsystem() { overseer_send(&mut virtual_overseer, query_chunk.into()).await; assert!(rx.await.unwrap().is_none()); - + virtual_overseer }); } #[test] fn store_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(TestState::default(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(33)); let validator_index = ValidatorIndex(5); let n_validators = 10; @@ -338,6 +338,7 @@ fn store_chunk_works() { overseer_send(&mut virtual_overseer, query_chunk.into()).await; assert_eq!(rx.await.unwrap().unwrap(), chunk); + virtual_overseer }); } @@ -345,8 +346,7 @@ fn store_chunk_works() { #[test] fn store_chunk_does_nothing_if_no_entry_already() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(TestState::default(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(33)); let validator_index = ValidatorIndex(5); @@ -377,14 +377,14 @@ fn store_chunk_does_nothing_if_no_entry_already() { overseer_send(&mut virtual_overseer, query_chunk.into()).await; assert!(rx.await.unwrap().is_none()); + virtual_overseer }); } #[test] fn query_chunk_checks_meta() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); - test_harness(TestState::default(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(33)); let validator_index = ValidatorIndex(5); let n_validators = 10; @@ -422,6 +422,7 @@ fn query_chunk_checks_meta() { overseer_send(&mut virtual_overseer, query_chunk.into()).await; assert!(!rx.await.unwrap()); + virtual_overseer }); } @@ -429,8 +430,7 @@ fn query_chunk_checks_meta() { fn store_block_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let validator_index = ValidatorIndex(5); let n_validators = 10; @@ -474,6 +474,7 @@ fn store_block_works() { }; assert_eq!(chunk, expected_chunk); + virtual_overseer }); } @@ -482,8 +483,7 @@ fn store_pov_and_query_chunk_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let n_validators = 10; @@ -516,6 +516,7 @@ fn store_pov_and_query_chunk_works() { assert_eq!(chunk.chunk, chunks_expected[i as usize]); } + virtual_overseer }); } @@ -524,9 +525,7 @@ fn query_all_chunks_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { // all chunks for hash 1. // 1 chunk for hash 2. // 0 chunks for hash 3. @@ -608,6 +607,7 @@ fn query_all_chunks_works() { virtual_overseer.send(FromOverseer::Communication { msg }).await; assert_eq!(rx.await.unwrap().len(), 0); } + virtual_overseer }); } @@ -616,8 +616,7 @@ fn stored_but_not_included_data_is_pruned() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { let candidate_hash = CandidateHash(Hash::repeat_byte(1)); let n_validators = 10; @@ -655,6 +654,7 @@ fn stored_but_not_included_data_is_pruned() { // The block was not included by this point so it should be pruned now. assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none()); + virtual_overseer }); } @@ -663,8 +663,7 @@ fn stored_data_kept_until_finalized() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { let n_validators = 10; let pov = PoV { @@ -760,6 +759,7 @@ fn stored_data_kept_until_finalized() { assert!( has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await ); + virtual_overseer }); } @@ -768,8 +768,7 @@ fn forkfullness_works() { let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let test_state = TestState::default(); - test_harness(test_state.clone(), store.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; + test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move { let n_validators = 10; let block_number_1 = 5; let block_number_2 = 5; @@ -930,11 +929,12 @@ fn forkfullness_works() { assert!( has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await, ); + virtual_overseer }); } async fn query_available_data( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + virtual_overseer: &mut VirtualOverseer, candidate_hash: CandidateHash, ) -> Option { let (tx, rx) = oneshot::channel(); @@ -946,7 +946,7 @@ async fn query_available_data( } async fn query_chunk( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + virtual_overseer: &mut VirtualOverseer, candidate_hash: CandidateHash, index: ValidatorIndex, ) -> Option { @@ -959,7 +959,7 @@ async fn query_chunk( } async fn has_all_chunks( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + virtual_overseer: &mut VirtualOverseer, candidate_hash: CandidateHash, n_validators: u32, expect_present: bool, @@ -973,7 +973,7 @@ async fn has_all_chunks( } async fn import_leaf( - virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + virtual_overseer: &mut VirtualOverseer, parent_hash: Hash, block_number: BlockNumber, events: Vec, diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 7538a211e6..585bd8cee3 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -1353,6 +1353,7 @@ mod tests { ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf, }; use polkadot_node_primitives::{InvalidCandidate, BlockData}; + use polkadot_node_subsystem_test_helpers as test_helpers; use sp_keyring::Sr25519Keyring; use sp_application_crypto::AppKey; use sp_keystore::{CryptoStore, SyncCryptoStore}; @@ -1466,14 +1467,16 @@ mod tests { } } - struct TestHarness { - virtual_overseer: polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle, - } + type VirtualOverseer = test_helpers::TestSubsystemContextHandle; - fn test_harness>(keystore: SyncCryptoStorePtr, test: impl FnOnce(TestHarness) -> T) { + fn test_harness>( + keystore: SyncCryptoStorePtr, + test: impl FnOnce(VirtualOverseer) -> T, + ) { let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); + let (context, virtual_overseer) = + test_helpers::make_subsystem_context(pool.clone()); let subsystem = CandidateBackingSubsystem::new( pool.clone(), @@ -1481,13 +1484,16 @@ mod tests { Metrics(None), ).run(context); - let test_fut = test(TestHarness { - virtual_overseer, - }); + let test_fut = test(virtual_overseer); futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - futures::executor::block_on(future::select(test_fut, subsystem)); + futures::executor::block_on(future::join(async move { + let mut virtual_overseer = test_fut.await; + virtual_overseer.send(FromOverseer::Signal( + OverseerSignal::Conclude, + )).await; + }, subsystem)); } fn make_erasure_root(test: &TestState, pov: PoV) -> Hash { @@ -1529,7 +1535,7 @@ mod tests { // Tests that the subsystem performs actions that are requied on startup. async fn test_startup( - virtual_overseer: &mut polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle, + virtual_overseer: &mut VirtualOverseer, test_state: &TestState, ) { // Start work on some new parent. @@ -1587,9 +1593,7 @@ mod tests { #[test] fn backing_second_works() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -1674,6 +1678,7 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } @@ -1681,9 +1686,7 @@ mod tests { #[test] fn backing_works() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -1820,15 +1823,14 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } #[test] fn backing_works_while_validation_ongoing() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -1983,6 +1985,7 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } @@ -1991,9 +1994,7 @@ mod tests { #[test] fn backing_misbehavior_works() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2137,6 +2138,7 @@ mod tests { ).expect("signature must be valid"); } ); + virtual_overseer }); } @@ -2145,9 +2147,7 @@ mod tests { #[test] fn backing_dont_second_invalid() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov_block_a = PoV { @@ -2268,6 +2268,7 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } @@ -2276,9 +2277,7 @@ mod tests { #[test] fn backing_second_after_first_fails_works() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2392,6 +2391,7 @@ mod tests { assert_eq!(&*pov, &pov_to_second); } ); + virtual_overseer }); } @@ -2400,9 +2400,7 @@ mod tests { #[test] fn backing_works_after_failed_validation() { let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2478,6 +2476,7 @@ mod tests { virtual_overseer.send(FromOverseer::Communication{ msg }).await; assert_eq!(rx.await.unwrap().len(), 0); + virtual_overseer }); } @@ -2491,9 +2490,7 @@ mod tests { collator: Some(Sr25519Keyring::Bob.public().into()), }); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2531,6 +2528,7 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } @@ -2542,9 +2540,7 @@ mod tests { collator: Some(Sr25519Keyring::Bob.public().into()), }); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2587,6 +2583,7 @@ mod tests { virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; + virtual_overseer }); } @@ -2672,9 +2669,7 @@ mod tests { fn retry_works() { // sp_tracing::try_init_simple(); let test_state = TestState::default(); - test_harness(test_state.keystore.clone(), |test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; let pov = PoV { @@ -2817,6 +2812,7 @@ mod tests { ) ) if pov == pov && &c == candidate.descriptor() ); + virtual_overseer }); } } diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index 1842b67559..8c5587ab14 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -219,7 +219,7 @@ mod select_candidates { futures::pin_mut!(overseer, test); - let _ = futures::executor::block_on(future::select(overseer, test)); + let _ = futures::executor::block_on(future::join(overseer, test)); } // For test purposes, we always return this set of availability cores: diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index e420ca1ea8..8b33082e5d 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -27,7 +27,7 @@ use super::*; type VirtualOverseer = test_helpers::TestSubsystemContextHandle; -fn test_harness>( +fn test_harness>( mut state: State, test_fn: impl FnOnce(VirtualOverseer) -> T, ) -> State { @@ -51,7 +51,14 @@ fn test_harness>( futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer + .send(FromOverseer::Signal(OverseerSignal::Conclude)) + .timeout(TIMEOUT) + .await + .expect("Conclude send timeout"); + }, subsystem)); } state @@ -262,6 +269,7 @@ fn try_import_the_same_assignment() { .is_none(), "no message should be sent", ); + virtual_overseer }); } @@ -342,6 +350,7 @@ fn spam_attack_results_in_negative_reputation_change() { expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await; expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE).await; } + virtual_overseer }); } @@ -423,6 +432,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { // now we should expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await; + virtual_overseer }); } @@ -509,6 +519,7 @@ fn import_approval_happy_path() { assert_eq!(approvals.len(), 1); } ); + virtual_overseer }); } @@ -588,6 +599,7 @@ fn import_approval_bad() { ); expect_reputation_change(overseer, &peer_b, COST_INVALID_MESSAGE).await; + virtual_overseer }); } @@ -626,6 +638,7 @@ fn update_our_view() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); overseer_send(overseer, msg).await; + virtual_overseer }); assert!(state.blocks_by_number.get(&1).is_some()); @@ -639,6 +652,7 @@ fn update_our_view() { let overseer = &mut virtual_overseer; // finalize a block overseer_signal_block_finalized(overseer, 2).await; + virtual_overseer }); assert!(state.blocks_by_number.get(&1).is_none()); @@ -652,6 +666,7 @@ fn update_our_view() { let overseer = &mut virtual_overseer; // finalize a very high block overseer_signal_block_finalized(overseer, 4_000_000_000).await; + virtual_overseer }); assert!(state.blocks_by_number.get(&3).is_none()); @@ -726,6 +741,7 @@ fn update_peer_view() { assert_eq!(assignments.len(), 1); } ); + virtual_overseer }); assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(0)); @@ -773,6 +789,7 @@ fn update_peer_view() { assert_eq!(assignments[0].0, cert_c); } ); + virtual_overseer }); assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(2)); @@ -799,6 +816,7 @@ fn update_peer_view() { NetworkBridgeEvent::PeerViewChange(peer.clone(), View::with_finalized(finalized_number)) ) ).await; + virtual_overseer }); assert_eq!(state.peer_views.get(peer).map(|v| v.finalized_number), Some(finalized_number)); @@ -909,6 +927,7 @@ fn import_remotely_then_locally() { .is_none(), "no message should be sent", ); + virtual_overseer }); } @@ -994,5 +1013,6 @@ fn sends_assignments_even_when_state_is_approved() { .is_none(), "no message should be sent", ); + virtual_overseer }); } diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index ed793e18b1..77b47e31e7 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -50,7 +50,7 @@ fn test_harness>( futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(test_fut, subsystem)).1.unwrap(); } } diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index 2adbdaba84..73fcd5f7ce 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -50,8 +50,9 @@ use test_helpers::SingleItemSink; use super::mock::{make_session_info, OccupiedCoreBuilder, make_ferdie_keystore}; use crate::LOG_TARGET; +type VirtualOverseer = test_helpers::TestSubsystemContextHandle; pub struct TestHarness { - pub virtual_overseer: test_helpers::TestSubsystemContextHandle, + pub virtual_overseer: VirtualOverseer, pub pool: TaskExecutor, } @@ -161,7 +162,7 @@ impl TestState { /// /// We try to be as agnostic about details as possible, how the subsystem achieves those goals /// should not be a matter to this test suite. - async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle) { + async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: VirtualOverseer) { // We skip genesis here (in reality ActiveLeavesUpdate can also skip a block: let updates = { let mut advanced = self.relay_chain.iter(); @@ -198,8 +199,7 @@ impl TestState { // cancel jobs as obsolete: Delay::new(Duration::from_millis(20)).await; } - }.boxed() - ); + }.boxed()); while remaining_stores > 0 { @@ -211,11 +211,12 @@ impl TestState { // Forward requests: let in_req = to_incoming_req(&executor, req); - executor.spawn("Request forwarding", - overseer_send( - tx.clone(), - AvailabilityDistributionMessage::ChunkFetchingRequest(in_req) - ).boxed() + executor.spawn( + "Request forwarding", + overseer_send( + tx.clone(), + AvailabilityDistributionMessage::ChunkFetchingRequest(in_req) + ).boxed() ); } } @@ -259,6 +260,8 @@ impl TestState { } } } + + overseer_signal(tx, OverseerSignal::Conclude).await; } } diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 028928f853..a20f3829c5 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -37,12 +37,8 @@ use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaege type VirtualOverseer = test_helpers::TestSubsystemContextHandle; -struct TestHarness { - virtual_overseer: VirtualOverseer, -} - -fn test_harness_fast_path>( - test: impl FnOnce(TestHarness) -> T, +fn test_harness_fast_path>( + test: impl FnOnce(VirtualOverseer) -> T, ) { let _ = env_logger::builder() .is_test(true) @@ -59,16 +55,19 @@ fn test_harness_fast_path>( let subsystem = AvailabilityRecoverySubsystem::with_fast_path(); let subsystem = subsystem.run(context); - let test_fut = test(TestHarness { virtual_overseer }); + let test_fut = test(virtual_overseer); futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer_signal(&mut overseer, OverseerSignal::Conclude).await; + }, subsystem)).1.unwrap(); } -fn test_harness_chunks_only>( - test: impl FnOnce(TestHarness) -> T, +fn test_harness_chunks_only>( + test: impl FnOnce(VirtualOverseer) -> T, ) { let _ = env_logger::builder() .is_test(true) @@ -85,12 +84,15 @@ fn test_harness_chunks_only>( let subsystem = AvailabilityRecoverySubsystem::with_chunks_only(); let subsystem = subsystem.run(context); - let test_fut = test(TestHarness { virtual_overseer }); + let test_fut = test(virtual_overseer); futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer_signal(&mut overseer, OverseerSignal::Conclude).await; + }, subsystem)).1.unwrap(); } const TIMEOUT: Duration = Duration::from_millis(100); @@ -439,9 +441,7 @@ impl Default for TestState { fn availability_is_recovered_from_chunks_if_no_group_provided() { let test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -514,6 +514,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { // A request times out with `Unavailable` error. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + virtual_overseer }); } @@ -521,9 +522,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunks_only() { let test_state = TestState::default(); - test_harness_chunks_only(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_chunks_only(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -596,6 +595,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk // A request times out with `Unavailable` error. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + virtual_overseer }); } @@ -603,9 +603,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk fn bad_merkle_path_leads_to_recovery_error() { let mut test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -653,6 +651,7 @@ fn bad_merkle_path_leads_to_recovery_error() { // A request times out with `Unavailable` error. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + virtual_overseer }); } @@ -660,9 +659,7 @@ fn bad_merkle_path_leads_to_recovery_error() { fn wrong_chunk_index_leads_to_recovery_error() { let mut test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -709,6 +706,7 @@ fn wrong_chunk_index_leads_to_recovery_error() { // A request times out with `Unavailable` error as there are no good peers. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + virtual_overseer }); } @@ -716,9 +714,7 @@ fn wrong_chunk_index_leads_to_recovery_error() { fn invalid_erasure_coding_leads_to_invalid_error() { let mut test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { let pov = PoV { block_data: BlockData(vec![69; 64]), }; @@ -775,6 +771,7 @@ fn invalid_erasure_coding_leads_to_invalid_error() { // f+1 'valid' chunks can't produce correct data. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid); + virtual_overseer }); } @@ -782,9 +779,7 @@ fn invalid_erasure_coding_leads_to_invalid_error() { fn fast_path_backing_group_recovers() { let test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -828,6 +823,7 @@ fn fast_path_backing_group_recovers() { // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + virtual_overseer }); } @@ -835,9 +831,7 @@ fn fast_path_backing_group_recovers() { fn no_answers_in_fast_path_causes_chunk_requests() { let test_state = TestState::default(); - test_harness_fast_path(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_fast_path(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -891,6 +885,7 @@ fn no_answers_in_fast_path_causes_chunk_requests() { // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + virtual_overseer }); } @@ -898,9 +893,7 @@ fn no_answers_in_fast_path_causes_chunk_requests() { fn task_canceled_when_receivers_dropped() { let test_state = TestState::default(); - test_harness_chunks_only(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_chunks_only(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -929,7 +922,7 @@ fn task_canceled_when_receivers_dropped() { for _ in 0..test_state.validators.len() { match virtual_overseer.recv().timeout(TIMEOUT).await { - None => return, + None => return virtual_overseer, Some(_) => continue, } } @@ -942,9 +935,7 @@ fn task_canceled_when_receivers_dropped() { fn chunks_retry_until_all_nodes_respond() { let test_state = TestState::default(); - test_harness_chunks_only(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_chunks_only(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -994,6 +985,7 @@ fn chunks_retry_until_all_nodes_respond() { // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + virtual_overseer }); } @@ -1001,9 +993,7 @@ fn chunks_retry_until_all_nodes_respond() { fn returns_early_if_we_have_the_data() { let test_state = TestState::default(); - test_harness_chunks_only(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_chunks_only(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -1032,6 +1022,7 @@ fn returns_early_if_we_have_the_data() { test_state.respond_to_available_data_query(&mut virtual_overseer, true).await; assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + virtual_overseer }); } @@ -1039,9 +1030,7 @@ fn returns_early_if_we_have_the_data() { fn does_not_query_local_validator() { let test_state = TestState::default(); - test_harness_chunks_only(|test_harness| async move { - let TestHarness { mut virtual_overseer } = test_harness; - + test_harness_chunks_only(|mut virtual_overseer| async move { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -1096,5 +1085,6 @@ fn does_not_query_local_validator() { ).await; assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + virtual_overseer }); } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index ccdaf4dc79..417135a5a2 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -1365,12 +1365,14 @@ mod tests { Box::new(oracle) } + type VirtualOverseer = TestSubsystemContextHandle; + struct TestHarness { network_handle: TestNetworkHandle, - virtual_overseer: TestSubsystemContextHandle, + virtual_overseer: VirtualOverseer, } - fn test_harness>( + fn test_harness>( sync_oracle: Box, test: impl FnOnce(TestHarness) -> T, ) { @@ -1402,7 +1404,10 @@ mod tests { futures::pin_mut!(test_fut); futures::pin_mut!(network_bridge); - let _ = executor::block_on(future::select(test_fut, network_bridge)); + let _ = executor::block_on(future::join(async move { + let mut virtual_overseer = test_fut.await; + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }, network_bridge)); } async fn assert_sends_validation_event_to_all( @@ -1494,6 +1499,7 @@ mod tests { ).encode(), ), ); + virtual_overseer }); } @@ -1585,6 +1591,7 @@ mod tests { wire_message.clone(), ), ); + virtual_overseer }); } @@ -1688,6 +1695,7 @@ mod tests { ), ); } + virtual_overseer }); } @@ -1754,6 +1762,7 @@ mod tests { wire_message.clone(), ), ); + virtual_overseer }); } @@ -1796,6 +1805,7 @@ mod tests { NetworkBridgeEvent::PeerViewChange(peer.clone(), view), &mut virtual_overseer, ).await; + virtual_overseer }); } @@ -1863,6 +1873,7 @@ mod tests { NetworkBridgeEvent::PeerDisconnected(peer), &mut virtual_overseer, ).await; + virtual_overseer }); } @@ -1938,6 +1949,7 @@ mod tests { wire_message.clone(), ), ); + virtual_overseer }); } @@ -2026,6 +2038,7 @@ mod tests { assert_eq!(m, collator_protocol_message); } ); + virtual_overseer }); } @@ -2091,6 +2104,7 @@ mod tests { NetworkBridgeEvent::PeerViewChange(peer.clone(), view_b.clone()), &mut virtual_overseer, ).await; + virtual_overseer }); } @@ -2136,13 +2150,14 @@ mod tests { wire_message.clone(), ), ); + virtual_overseer }); } #[test] fn view_finalized_number_can_not_go_down() { test_harness(done_syncing_oracle(), |test_harness| async move { - let TestHarness { mut network_handle, .. } = test_harness; + let TestHarness { mut network_handle, virtual_overseer } = test_harness; let peer_a = PeerId::random(); @@ -2176,6 +2191,7 @@ mod tests { MALFORMED_VIEW_COST, ), ); + virtual_overseer }); } @@ -2279,6 +2295,7 @@ mod tests { ) ); } + virtual_overseer }); } @@ -2361,6 +2378,7 @@ mod tests { NetworkBridgeEvent::OurViewChange(our_view), &mut virtual_overseer, ).await; + virtual_overseer }); } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 012f81504e..a8f1dcf3a2 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -1139,7 +1139,7 @@ mod tests { virtual_overseer: VirtualOverseer, } - fn test_harness>( + fn test_harness>( local_peer_id: PeerId, collator_pair: CollatorPair, test: impl FnOnce(TestHarness) -> T, @@ -1155,13 +1155,16 @@ mod tests { futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer_signal(&mut overseer, OverseerSignal::Conclude).await; + }, subsystem)).1.unwrap(); } const TIMEOUT: Duration = Duration::from_millis(100); async fn overseer_send( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, msg: CollatorProtocolMessage, ) { tracing::trace!(?msg, "sending message"); @@ -1173,7 +1176,7 @@ mod tests { } async fn overseer_recv( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, ) -> AllMessages { let msg = overseer_recv_with_timeout(overseer, TIMEOUT) .await @@ -1185,7 +1188,7 @@ mod tests { } async fn overseer_recv_with_timeout( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, timeout: Duration, ) -> Option { tracing::trace!("waiting for message..."); @@ -1196,7 +1199,7 @@ mod tests { } async fn overseer_signal( - overseer: &mut test_helpers::TestSubsystemContextHandle, + overseer: &mut VirtualOverseer, signal: OverseerSignal, ) { overseer @@ -1570,6 +1573,7 @@ mod tests { ).await; expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; + virtual_overseer }); } @@ -1589,6 +1593,7 @@ mod tests { // A validator connected to us connect_peer(&mut virtual_overseer, peer.clone()).await; expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; + virtual_overseer }) } @@ -1632,6 +1637,7 @@ mod tests { // After changing the view we should receive the advertisement expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; + virtual_overseer }) } @@ -1678,7 +1684,9 @@ mod tests { expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await; send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await; + virtual_overseer }) } @@ -1714,6 +1722,7 @@ mod tests { send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); + virtual_overseer }) } @@ -1756,6 +1765,7 @@ mod tests { PeerSet::Collation, )) if p == peer ); + virtual_overseer }) } } diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 81a904fe69..7698f28dd1 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -1325,7 +1325,7 @@ mod tests { virtual_overseer: VirtualOverseer, } - fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + fn test_harness>(test: impl FnOnce(TestHarness) -> T) { let _ = env_logger::builder() .is_test(true) .filter( @@ -1363,7 +1363,10 @@ mod tests { futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); - executor::block_on(future::select(test_fut, subsystem)); + executor::block_on(future::join(async move { + let mut overseer = test_fut.await; + overseer_signal(&mut overseer, OverseerSignal::Conclude).await; + }, subsystem)).1.unwrap(); } const TIMEOUT: Duration = Duration::from_millis(200); @@ -1403,6 +1406,17 @@ mod tests { .await } + async fn overseer_signal( + overseer: &mut VirtualOverseer, + signal: OverseerSignal, + ) { + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); + } + async fn respond_to_core_info_queries( virtual_overseer: &mut VirtualOverseer, test_state: &TestState, @@ -1513,6 +1527,7 @@ mod tests { assert_eq!(para_id, test_state.chain_ids[0]); assert_eq!(collator, pair.public()); }); + virtual_overseer }); } @@ -1615,6 +1630,7 @@ mod tests { assert_eq!(rep, BENEFIT_NOTIFY_GOOD); } ); + virtual_overseer }); } @@ -1664,6 +1680,7 @@ mod tests { assert_eq!(rep, COST_INVALID_SIGNATURE); } ); + virtual_overseer }); } @@ -1883,6 +1900,7 @@ mod tests { assert_eq!(collation_0.0, candidate_a); assert_eq!(collation_1.0, candidate_b); + virtual_overseer }); } @@ -1970,7 +1988,8 @@ mod tests { assert_eq!(peer, peer_b); assert_eq!(peer_set, PeerSet::Collation); } - ) + ); + virtual_overseer }); } @@ -2119,7 +2138,8 @@ mod tests { assert_eq!(peer, peer_b); assert_eq!(peer_set, PeerSet::Collation); } - ) + ); + virtual_overseer }); } @@ -2162,7 +2182,8 @@ mod tests { assert_eq!(peer, peer_b); assert_eq!(peer_set, PeerSet::Collation); } - ) + ); + virtual_overseer }) } @@ -2233,6 +2254,7 @@ mod tests { assert_eq!(peer_set, PeerSet::Collation); } ); + virtual_overseer }) } @@ -2304,6 +2326,7 @@ mod tests { assert_eq!(peer_set, PeerSet::Collation); } ); + virtual_overseer }) } } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 4470ebdcbf..9425c59ac2 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1889,11 +1889,12 @@ mod tests { assert_eq!(s, statement); } ); + handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; }; futures::pin_mut!(test_fut); futures::pin_mut!(bg); - executor::block_on(future::select(test_fut, bg)); + executor::block_on(future::join(test_fut, bg)); } }