Ensure we spawn the block import worker as an essential task (#8155)

* Ensure we spawn the block import worker as an essential task

This pr ensures that we spawn the block import worker as an essential
task. This is quite important as we need to bring down the node when the
block import is done. Besides that it adds some debug output to the
block import worker.

* Don't be stupid :D
This commit is contained in:
Bastian Köcher
2021-02-19 17:31:03 +01:00
committed by GitHub
parent 16a27c28a9
commit 821e018d75
10 changed files with 75 additions and 13 deletions
@@ -73,7 +73,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
Some(Box::new(grandpa_block_import.clone())),
client.clone(),
inherent_data_providers.clone(),
&task_manager.spawn_handle(),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()),
)?;
@@ -295,7 +295,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
Some(Box::new(grandpa_block_import)),
client.clone(),
InherentDataProviders::new(),
&task_manager.spawn_handle(),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
sp_consensus::NeverCanAuthor,
)?;
+2 -2
View File
@@ -94,7 +94,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
client.clone(),
select_chain.clone(),
inherent_data_providers.clone(),
&task_manager.spawn_handle(),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()),
)?;
@@ -405,7 +405,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
client.clone(),
select_chain.clone(),
inherent_data_providers.clone(),
&task_manager.spawn_handle(),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
sp_consensus::NeverCanAuthor,
)?;
+1 -1
View File
@@ -849,7 +849,7 @@ pub fn import_queue<B, I, C, P, S, CAW>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
S: sp_core::traits::SpawnNamed,
S: sp_core::traits::SpawnEssentialNamed,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
+1 -1
View File
@@ -1491,7 +1491,7 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CAW>(
client: Arc<Client>,
select_chain: SelectChain,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnNamed,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
can_author_with: CAW,
) -> ClientResult<DefaultImportQueue<Block, Client>> where
@@ -73,7 +73,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, Transaction>(
block_import: BoxBlockImport<Block, Transaction>,
spawner: &impl sp_core::traits::SpawnNamed,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
) -> BasicQueue<Block, Transaction>
where
+1 -1
View File
@@ -505,7 +505,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
justification_import: Option<BoxJustificationImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: &impl sp_core::traits::SpawnNamed,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
) -> Result<
PowImportQueue<B, Transaction>,
@@ -150,6 +150,7 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle {
/// task spawned through it fails. The service should be on the receiver side
/// and will shut itself down whenever it receives any message, i.e. an
/// essential task has failed.
#[derive(Clone)]
pub struct SpawnEssentialTaskHandle {
essential_failed_tx: TracingUnboundedSender<()>,
inner: SpawnTaskHandle,
@@ -203,6 +204,16 @@ impl SpawnEssentialTaskHandle {
}
}
impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle {
fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_blocking(name, future);
}
fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn(name, future);
}
}
/// Helper struct to manage background/async tasks in Service.
pub struct TaskManager {
/// A future that resolves when the service has exited, this is useful to
@@ -62,7 +62,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
verifier: V,
block_import: BoxBlockImport<B, Transaction>,
justification_import: Option<BoxJustificationImport<B>>,
spawner: &impl sp_core::traits::SpawnNamed,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
@@ -83,7 +83,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
metrics,
);
spawner.spawn_blocking("basic-block-import-worker", future.boxed());
spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed());
Self {
justification_sender,
@@ -164,7 +164,13 @@ async fn block_import_process<B: BlockT, Transaction: Send>(
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await {
Some(blocks) => blocks,
None => return,
None => {
log::debug!(
target: "block-import",
"Stopping block import because the import channel was closed!",
);
return
},
};
let res = import_many_blocks(
@@ -236,6 +242,10 @@ impl<B: BlockT> BlockImportWorker<B> {
// If the results sender is closed, that means that the import queue is shutting
// down and we should end this future.
if worker.result_sender.is_closed() {
log::debug!(
target: "block-import",
"Stopping block import because result channel was closed!",
);
return;
}
@@ -244,7 +254,13 @@ impl<B: BlockT> BlockImportWorker<B> {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification),
None => return,
None => {
log::debug!(
target: "block-import",
"Stopping block import because justification channel was closed!",
);
return
},
}
}
+10
View File
@@ -152,3 +152,13 @@ impl crate::traits::SpawnNamed for TaskExecutor {
self.0.spawn_ok(future);
}
}
#[cfg(feature = "std")]
impl crate::traits::SpawnEssentialNamed for TaskExecutor {
fn spawn_essential_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
}
+26 -1
View File
@@ -205,7 +205,7 @@ sp_externalities::decl_extension! {
pub struct RuntimeSpawnExt(Box<dyn RuntimeSpawn>);
}
/// Something that can spawn futures (blocking and non-blocking) with an assigned name.
/// Something that can spawn tasks (blocking and non-blocking) with an assigned name.
#[dyn_clonable::clonable]
pub trait SpawnNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
@@ -227,3 +227,28 @@ impl SpawnNamed for Box<dyn SpawnNamed> {
(**self).spawn(name, future)
}
}
/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name.
///
/// Essential tasks are special tasks that should take down the node when they end.
#[dyn_clonable::clonable]
pub trait SpawnEssentialNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
/// Spawn the given non-blocking future.
///
/// The given `name` is used to identify the future in tracing.
fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
}
impl SpawnEssentialNamed for Box<dyn SpawnEssentialNamed> {
fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn_essential_blocking(name, future)
}
fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn_essential(name, future)
}
}