Expose that BasicQueue expects blocking spawn (#5860)

* Expose that `BasicQueue` expects blocking spawn

Up to now `BasicQueue` expected a closure that to spawn a `Future`.
This was expected to be a closure that spawns a blocking future.
However, this wasn't documented anywhere. This pr introduces a new trait
`SpawnBlocking` that exposes this requirement to the outside.

* Feedback
This commit is contained in:
Bastian Köcher
2020-05-04 19:40:29 +02:00
committed by GitHub
parent 8549cf5899
commit 9c5536e01a
15 changed files with 90 additions and 48 deletions
+1
View File
@@ -6205,6 +6205,7 @@ dependencies = [
"serde",
"sp-blockchain",
"sp-consensus",
"sp-core",
"sp-inherents",
"sp-runtime",
"sp-transaction-pool",
@@ -27,7 +27,7 @@ macro_rules! new_full_start {
($config:expr) => {{
use std::sync::Arc;
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
let mut import_setup = None;
let inherent_data_providers = sp_inherents::InherentDataProviders::new();
@@ -45,15 +45,16 @@ macro_rules! new_full_start {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
let (grandpa_block_import, grandpa_link) =
sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain)?;
let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import(
client.clone(),
&(client.clone() as Arc<_>),
select_chain,
)?;
let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new(
grandpa_block_import.clone(), client.clone(),
);
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);
let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
aura_block_import,
@@ -61,7 +62,7 @@ macro_rules! new_full_start {
None,
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;
import_setup = Some((grandpa_block_import, grandpa_link));
@@ -208,8 +209,6 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
let finality_proof_request_builder =
finality_proof_import.create_finality_proof_request_builder();
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);
let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
grandpa_block_import,
@@ -217,7 +216,7 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
Some(Box::new(finality_proof_import)),
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;
Ok((import_queue, finality_proof_request_builder))
+7 -7
View File
@@ -50,7 +50,11 @@ macro_rules! new_full_start {
})?
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
Ok(sc_transaction_pool::BasicPool::new(
config,
std::sync::Arc::new(pool_api),
prometheus_registry,
))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
@@ -68,8 +72,6 @@ macro_rules! new_full_start {
client.clone(),
)?;
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);
let import_queue = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
@@ -77,7 +79,7 @@ macro_rules! new_full_start {
None,
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;
import_setup = Some((block_import, grandpa_link, babe_link));
@@ -308,8 +310,6 @@ pub fn new_light(config: Configuration)
client.clone(),
)?;
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);
let import_queue = sc_consensus_babe::import_queue(
babe_link,
babe_block_import,
@@ -317,7 +317,7 @@ pub fn new_light(config: Configuration)
Some(Box::new(finality_proof_import)),
client.clone(),
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;
Ok((import_queue, finality_proof_request_builder))
+5 -4
View File
@@ -33,7 +33,7 @@ use std::{
collections::HashMap
};
use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use parking_lot::Mutex;
use log::{debug, info, trace};
@@ -788,14 +788,14 @@ impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I,
}
/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, I, C, P, F>(
pub fn import_queue<B, I, C, P, S>(
slot_duration: SlotDuration,
block_import: I,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: F,
spawner: &S,
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
@@ -805,7 +805,7 @@ pub fn import_queue<B, I, C, P, F>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
F: Fn(BoxFuture<'static, ()>) -> (),
S: sp_core::traits::SpawnBlocking,
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
@@ -815,6 +815,7 @@ pub fn import_queue<B, I, C, P, F>(
inherent_data_providers,
phantom: PhantomData,
};
Ok(BasicQueue::new(
verifier,
Box::new(block_import),
+2 -2
View File
@@ -107,7 +107,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use log::{debug, info, log, trace, warn};
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
@@ -1272,7 +1272,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
@@ -28,6 +28,7 @@ sp-blockchain = { path = "../../../primitives/blockchain" , version = "2.0.0-dev
sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common" , version = "0.8.0-dev"}
sp-inherents = { path = "../../../primitives/inherents" , version = "2.0.0-dev"}
sp-runtime = { path = "../../../primitives/runtime" , version = "2.0.0-dev"}
sp-core = { path = "../../../primitives/core" , version = "2.0.0-dev"}
sp-transaction-pool = { path = "../../../primitives/transaction-pool" , version = "2.0.0-dev"}
[dev-dependencies]
@@ -17,7 +17,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.
use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
@@ -68,7 +68,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> ()
spawner: &impl sp_core::traits::SpawnBlocking,
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
+4 -3
View File
@@ -49,12 +49,13 @@ use sp_consensus::{
SelectChain, Error as ConsensusError, CanAuthorWith, RecordProof, BlockImport,
BlockCheckParams, ImportResult,
};
use sp_consensus::import_queue::{BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport};
use sp_consensus::import_queue::{
BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport,
};
use codec::{Encode, Decode};
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
use futures::future::BoxFuture;
#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
@@ -462,7 +463,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
finality_proof_import: Option<BoxFinalityProofImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
@@ -80,15 +80,12 @@ fn build_test_full_node(config: config::NetworkConfiguration)
}
}
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
None,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));
let worker = NetworkWorker::new(config::Params {
@@ -81,13 +81,18 @@ fn import_single_good_block_without_header_fails() {
#[test]
fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner);
let queue = BasicQueue::new(
verifier,
Box::new(substrate_test_runtime_client::new()),
None,
None,
&executor,
);
drop(queue);
}
}
+2 -8
View File
@@ -606,15 +606,12 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
@@ -687,15 +684,12 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));
let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
+14 -3
View File
@@ -15,10 +15,10 @@
use std::{panic, pin::Pin, result::Result, sync::Arc};
use exit_future::Signal;
use log::{debug};
use log::debug;
use futures::{
Future, FutureExt,
future::{select, Either},
future::{select, Either, BoxFuture},
compat::*,
task::{Spawn, FutureObj, SpawnError},
};
@@ -62,7 +62,12 @@ impl SpawnTaskHandle {
}
/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
fn spawn_inner(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static, task_type: TaskType) {
fn spawn_inner(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
@@ -119,6 +124,12 @@ impl Spawn for SpawnTaskHandle {
}
}
impl sp_core::traits::SpawnBlocking for SpawnTaskHandle {
fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_blocking(name, future);
}
}
impl sc_client_api::CloneableSpawn for SpawnTaskHandle {
fn clone(&self) -> Box<dyn CloneableSpawn> {
Box::new(Clone::clone(self))
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture};
use futures::{prelude::*, task::Context, task::Poll};
use futures_timer::Delay;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
@@ -56,7 +56,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
block_import: BoxBlockImport<B, Transaction>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (future, worker_sender) = BlockImportWorker::new(
@@ -67,7 +67,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
finality_proof_import,
);
spawner(future.boxed());
spawner.spawn_blocking("basic-block-import-worker", future.boxed());
Self {
sender: worker_sender,
+24
View File
@@ -290,6 +290,30 @@ macro_rules! wasm_export_functions {
};
}
/// An executor that supports spawning blocking futures in tests.
///
/// Internally this just wraps a `ThreadPool` with a pool size of `8`. This
/// should ensure that we have enough threads in tests for spawning blocking futures.
#[cfg(feature = "std")]
#[derive(Clone)]
pub struct SpawnBlockingExecutor(futures::executor::ThreadPool);
#[cfg(feature = "std")]
impl SpawnBlockingExecutor {
/// Create a new instance of `Self`.
pub fn new() -> Self {
let mut builder = futures::executor::ThreadPoolBuilder::new();
Self(builder.pool_size(8).create().expect("Failed to create thread pool"))
}
}
#[cfg(feature = "std")]
impl crate::traits::SpawnBlocking for SpawnBlockingExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
}
#[cfg(test)]
mod tests {
use super::*;
+8
View File
@@ -330,3 +330,11 @@ impl TaskExecutorExt {
Self(spawn_handle)
}
}
/// Something that can spawn a blocking future.
pub trait SpawnBlocking {
/// Spawn the given blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
}