From f90f4854cdf0ea603e32d7902c95551f5eded844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 3 Jun 2020 17:48:09 +0200 Subject: [PATCH] Split tx pool and offchain notification handling (#6231) Instead of having the tx pool and offchain worker being feed from the same import notification stream, this pr splits them to use two different streams. The first advantage of this split is that the tx pool will not be spawned anymore in another task and instead will directly process the notification in the same task. This has the advantage of being faster when the system is being under load, as the tx pool will not be waiting for being scheduled to handle the notification. --- substrate/client/service/src/builder.rs | 101 +++++++++++++----------- 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index d921606ea6..baf1c2e0cc 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -1025,63 +1025,74 @@ ServiceBuilder< let spawn_handle = task_manager.spawn_handle(); + // Inform the tx pool about imported and finalized blocks. { - // block notifications let txpool = Arc::downgrade(&transaction_pool); + + let mut import_stream = client.import_notification_stream().map(|n| ChainEvent::NewBlock { + id: BlockId::Hash(n.hash), + header: n.header, + retracted: n.retracted, + is_new_best: n.is_new_best, + }).fuse(); + let mut finality_stream = client.finality_notification_stream() + .map(|n| ChainEvent::Finalized:: { hash: n.hash }) + .fuse(); + + let events = async move { + loop { + let evt = futures::select! { + evt = import_stream.next() => evt, + evt = finality_stream.next() => evt, + complete => return, + }; + + let txpool = txpool.upgrade(); + if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) { + txpool.maintain(evt).await; + } + } + }; + + spawn_handle.spawn( + "txpool-notifications", + events, + ); + } + + // Inform the offchain worker about new imported blocks + { let offchain = offchain_workers.as_ref().map(Arc::downgrade); let notifications_spawn_handle = task_manager.spawn_handle(); let network_state_info: Arc = network.clone(); let is_validator = config.role.is_authority(); - let (import_stream, finality_stream) = ( - client.import_notification_stream().map(|n| ChainEvent::NewBlock { - id: BlockId::Hash(n.hash), - header: n.header, - retracted: n.retracted, - is_new_best: n.is_new_best, - }), - client.finality_notification_stream().map(|n| ChainEvent::Finalized { - hash: n.hash - }) - ); - let events = futures::stream::select(import_stream, finality_stream) - .for_each(move |event| { - // offchain worker is only interested in block import events - if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - match offchain { - Some(offchain) if is_new_best => { - notifications_spawn_handle.spawn( - "offchain-on-block", - offchain.on_block_imported( - &header, - network_state_info.clone(), - is_validator, - ), - ); - }, - Some(_) => log::debug!( - target: "sc_offchain", - "Skipping offchain workers for non-canon block: {:?}", - header, - ), - _ => {}, - } - }; - - let txpool = txpool.upgrade(); - if let Some(txpool) = txpool.as_ref() { + let events = client.import_notification_stream().for_each(move |n| { + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + match offchain { + Some(offchain) if n.is_new_best => { notifications_spawn_handle.spawn( - "txpool-maintain", - txpool.maintain(event), + "offchain-on-block", + offchain.on_block_imported( + &n.header, + network_state_info.clone(), + is_validator, + ), ); - } + }, + Some(_) => log::debug!( + target: "sc_offchain", + "Skipping offchain workers for non-canon block: {:?}", + n.header, + ), + _ => {}, + } - ready(()) - }); + ready(()) + }); spawn_handle.spawn( - "txpool-and-offchain-notif", + "offchain-notifications", events, ); }