diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index d921606ea6..baf1c2e0cc 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -1025,63 +1025,74 @@ ServiceBuilder< let spawn_handle = task_manager.spawn_handle(); + // Inform the tx pool about imported and finalized blocks. { - // block notifications let txpool = Arc::downgrade(&transaction_pool); + + let mut import_stream = client.import_notification_stream().map(|n| ChainEvent::NewBlock { + id: BlockId::Hash(n.hash), + header: n.header, + retracted: n.retracted, + is_new_best: n.is_new_best, + }).fuse(); + let mut finality_stream = client.finality_notification_stream() + .map(|n| ChainEvent::Finalized:: { hash: n.hash }) + .fuse(); + + let events = async move { + loop { + let evt = futures::select! { + evt = import_stream.next() => evt, + evt = finality_stream.next() => evt, + complete => return, + }; + + let txpool = txpool.upgrade(); + if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) { + txpool.maintain(evt).await; + } + } + }; + + spawn_handle.spawn( + "txpool-notifications", + events, + ); + } + + // Inform the offchain worker about new imported blocks + { let offchain = offchain_workers.as_ref().map(Arc::downgrade); let notifications_spawn_handle = task_manager.spawn_handle(); let network_state_info: Arc = network.clone(); let is_validator = config.role.is_authority(); - let (import_stream, finality_stream) = ( - client.import_notification_stream().map(|n| ChainEvent::NewBlock { - id: BlockId::Hash(n.hash), - header: n.header, - retracted: n.retracted, - is_new_best: n.is_new_best, - }), - client.finality_notification_stream().map(|n| ChainEvent::Finalized { - hash: n.hash - }) - ); - let events = futures::stream::select(import_stream, finality_stream) - .for_each(move |event| { - // offchain worker is only interested in block import events - if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - match offchain { - Some(offchain) if is_new_best => { - notifications_spawn_handle.spawn( - "offchain-on-block", - offchain.on_block_imported( - &header, - network_state_info.clone(), - is_validator, - ), - ); - }, - Some(_) => log::debug!( - target: "sc_offchain", - "Skipping offchain workers for non-canon block: {:?}", - header, - ), - _ => {}, - } - }; - - let txpool = txpool.upgrade(); - if let Some(txpool) = txpool.as_ref() { + let events = client.import_notification_stream().for_each(move |n| { + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + match offchain { + Some(offchain) if n.is_new_best => { notifications_spawn_handle.spawn( - "txpool-maintain", - txpool.maintain(event), + "offchain-on-block", + offchain.on_block_imported( + &n.header, + network_state_info.clone(), + is_validator, + ), ); - } + }, + Some(_) => log::debug!( + target: "sc_offchain", + "Skipping offchain workers for non-canon block: {:?}", + n.header, + ), + _ => {}, + } - ready(()) - }); + ready(()) + }); spawn_handle.spawn( - "txpool-and-offchain-notif", + "offchain-notifications", events, ); }