diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 64bdc6db75..464f8d3226 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -201,33 +201,36 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future {}, + _ = run_prepare_queue.fuse() => {}, + _ = run_prepare_pool.fuse() => {}, + _ = run_execute_queue.fuse() => {}, + _ = run_sweeper.fuse() => {}, + }; + }; + + (validation_host, task) } /// An execution request that should execute the PVF (known in the context) and send the results @@ -297,15 +300,18 @@ async fn run( mut to_sweeper_tx, mut awaiting_prepare, }: Inner, - prepare_pool: impl Future + Unpin, - prepare_queue: impl Future + Unpin, - execute_queue: impl Future + Unpin, - sweeper: impl Future + Unpin, ) { macro_rules! break_if_fatal { ($expr:expr) => { match $expr { - Err(Fatal) => break, + Err(Fatal) => { + gum::error!( + target: LOG_TARGET, + "Fatal error occurred, terminating the host. Line: {}", + line!(), + ); + break + }, Ok(v) => v, } }; @@ -317,31 +323,9 @@ async fn run( let mut to_host_rx = to_host_rx.fuse(); let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse(); - // Make sure that the task-futures are fused. - let mut prepare_queue = prepare_queue.fuse(); - let mut prepare_pool = prepare_pool.fuse(); - let mut execute_queue = execute_queue.fuse(); - let mut sweeper = sweeper.fuse(); - loop { // biased to make it behave deterministically for tests. futures::select_biased! { - _ = prepare_queue => { - never!("prepare_pool: long-running task never concludes; qed"); - break; - }, - _ = prepare_pool => { - never!("prepare_pool: long-running task never concludes; qed"); - break; - }, - _ = execute_queue => { - never!("execute_queue: long-running task never concludes; qed"); - break; - }, - _ = sweeper => { - never!("sweeper: long-running task never concludes; qed"); - break; - }, () = cleanup_pulse.select_next_some() => { // `select_next_some` because we don't expect this to fail, but if it does, we // still don't fail. The tradeoff is that the compiled cache will start growing @@ -356,7 +340,14 @@ async fn run( ).await); }, to_host = to_host_rx.next() => { - let to_host = break_if_fatal!(to_host.ok_or(Fatal)); + let to_host = match to_host { + None => { + // The sending half of the channel has been closed, meaning the + // `ValidationHost` struct was dropped. Shutting down gracefully. + break; + }, + Some(to_host) => to_host, + }; break_if_fatal!(handle_to_host( &cache_path, @@ -761,26 +752,18 @@ mod tests { let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10); let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10); - let mk_dummy_loop = || std::future::pending().boxed(); - - let run = run( - Inner { - cache_path, - cleanup_pulse_interval, - artifact_ttl, - artifacts, - to_host_rx, - to_prepare_queue_tx, - from_prepare_queue_rx, - to_execute_queue_tx, - to_sweeper_tx, - awaiting_prepare: AwaitingPrepare::default(), - }, - mk_dummy_loop(), - mk_dummy_loop(), - mk_dummy_loop(), - mk_dummy_loop(), - ) + let run = run(Inner { + cache_path, + cleanup_pulse_interval, + artifact_ttl, + artifacts, + to_host_rx, + to_prepare_queue_tx, + from_prepare_queue_rx, + to_execute_queue_tx, + to_sweeper_tx, + awaiting_prepare: AwaitingPrepare::default(), + }) .boxed(); Self { diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 147af508cc..83cbd27b6e 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -112,3 +112,33 @@ async fn execute_bad_on_parent() { .await .unwrap_err(); } + +#[async_std::test] +async fn stress_spawn() { + let host = std::sync::Arc::new(TestHost::new()); + + async fn execute(host: std::sync::Arc) { + let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; + let block_data = BlockData { state: 0, add: 512 }; + let ret = host + .validate_candidate( + adder::wasm_binary_unwrap(), + ValidationParams { + parent_head: GenericHeadData(parent_head.encode()), + block_data: GenericBlockData(block_data.encode()), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + ) + .await + .unwrap(); + + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + + assert_eq!(new_head.number, 1); + assert_eq!(new_head.parent_hash, parent_head.hash()); + assert_eq!(new_head.post_state, hash_state(512)); + } + + futures::future::join_all((0..100).map(|_| execute(host.clone()))).await; +}