mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 03:31:10 +00:00
node: spawn block authoring and grandpa voter as blocking tasks (#6446)
* service: add spawner for essential tasks * node: spawn block authoring and grandpa voter as blocking tasks * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -142,7 +142,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr
|
||||
|
||||
// the AURA authoring task is considered essential, i.e. if it
|
||||
// fails we take down the service with it.
|
||||
service.spawn_essential_task("aura", aura);
|
||||
service.spawn_essential_task_handle().spawn_blocking("aura", aura);
|
||||
}
|
||||
|
||||
// if the node isn't actively participating in consensus then it doesn't
|
||||
@@ -184,7 +184,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr
|
||||
|
||||
// the GRANDPA voter task is considered infallible, i.e.
|
||||
// if it fails we take down the service with it.
|
||||
service.spawn_essential_task(
|
||||
service.spawn_essential_task_handle().spawn_blocking(
|
||||
"grandpa-voter",
|
||||
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?
|
||||
);
|
||||
|
||||
@@ -217,7 +217,7 @@ macro_rules! new_full {
|
||||
};
|
||||
|
||||
let babe = sc_consensus_babe::start_babe(babe_config)?;
|
||||
service.spawn_essential_task("babe-proposer", babe);
|
||||
service.spawn_essential_task_handle().spawn_blocking("babe-proposer", babe);
|
||||
}
|
||||
|
||||
// Spawn authority discovery module.
|
||||
@@ -250,7 +250,7 @@ macro_rules! new_full {
|
||||
service.prometheus_registry(),
|
||||
);
|
||||
|
||||
service.spawn_task("authority-discovery", authority_discovery);
|
||||
service.spawn_task_handle().spawn("authority-discovery", authority_discovery);
|
||||
}
|
||||
|
||||
// if the node isn't actively participating in consensus then it doesn't
|
||||
@@ -292,7 +292,7 @@ macro_rules! new_full {
|
||||
|
||||
// the GRANDPA voter task is considered infallible, i.e.
|
||||
// if it fails we take down the service with it.
|
||||
service.spawn_essential_task(
|
||||
service.spawn_essential_task_handle().spawn_blocking(
|
||||
"grandpa-voter",
|
||||
grandpa::run_grandpa_voter(grandpa_config)?
|
||||
);
|
||||
|
||||
@@ -82,7 +82,7 @@ pub use sc_network::config::{
|
||||
TransactionImportFuture,
|
||||
};
|
||||
pub use sc_tracing::TracingReceiver;
|
||||
pub use task_manager::SpawnTaskHandle;
|
||||
pub use task_manager::{SpawnEssentialTaskHandle, SpawnTaskHandle};
|
||||
use task_manager::TaskManager;
|
||||
use sp_blockchain::{HeaderBackend, HeaderMetadata};
|
||||
use sp_api::{ApiExt, ConstructRuntimeApi, ApiErrorExt};
|
||||
@@ -166,13 +166,19 @@ pub trait AbstractService: Future<Output = Result<(), Error>> + Send + Unpin + S
|
||||
/// The task name is a `&'static str` as opposed to a `String`. The reason for that is that
|
||||
/// in order to avoid memory consumption issues with the Prometheus metrics, the set of
|
||||
/// possible task names has to be bounded.
|
||||
#[deprecated(note = "Use `spawn_task_handle().spawn() instead.")]
|
||||
fn spawn_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);
|
||||
|
||||
/// Spawns a task in the background that runs the future passed as
|
||||
/// parameter. The given task is considered essential, i.e. if it errors we
|
||||
/// trigger a service exit.
|
||||
#[deprecated(note = "Use `spawn_essential_task_handle().spawn() instead.")]
|
||||
fn spawn_essential_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);
|
||||
|
||||
/// Returns a handle for spawning essential tasks. Any task spawned through this handle is
|
||||
/// considered essential, i.e. if it errors we trigger a service exit.
|
||||
fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle;
|
||||
|
||||
/// Returns a handle for spawning tasks.
|
||||
fn spawn_task_handle(&self) -> SpawnTaskHandle;
|
||||
|
||||
@@ -269,13 +275,20 @@ where
|
||||
let _ = essential_failed.send(());
|
||||
});
|
||||
|
||||
let _ = self.spawn_task(name, essential_task);
|
||||
let _ = self.spawn_task_handle().spawn(name, essential_task);
|
||||
}
|
||||
|
||||
fn spawn_task_handle(&self) -> SpawnTaskHandle {
|
||||
self.task_manager.spawn_handle()
|
||||
}
|
||||
|
||||
fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle {
|
||||
SpawnEssentialTaskHandle::new(
|
||||
self.essential_failed_tx.clone(),
|
||||
self.task_manager.spawn_handle(),
|
||||
)
|
||||
}
|
||||
|
||||
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
|
||||
Box::pin(
|
||||
self.rpc_handlers.handle_request(request, mem.metadata.clone())
|
||||
|
||||
@@ -28,6 +28,7 @@ use prometheus_endpoint::{
|
||||
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
|
||||
};
|
||||
use sc_client_api::CloneableSpawn;
|
||||
use sp_utils::mpsc::TracingUnboundedSender;
|
||||
use crate::config::TaskType;
|
||||
|
||||
mod prometheus_future;
|
||||
@@ -149,6 +150,64 @@ impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
|
||||
/// task spawned through it fails. The service should be on the receiver side
|
||||
/// and will shut itself down whenever it receives any message, i.e. an
|
||||
/// essential task has failed.
|
||||
pub struct SpawnEssentialTaskHandle {
|
||||
essential_failed_tx: TracingUnboundedSender<()>,
|
||||
inner: SpawnTaskHandle,
|
||||
}
|
||||
|
||||
impl SpawnEssentialTaskHandle {
|
||||
/// Creates a new `SpawnEssentialTaskHandle`.
|
||||
pub fn new(
|
||||
essential_failed_tx: TracingUnboundedSender<()>,
|
||||
spawn_task_handle: SpawnTaskHandle,
|
||||
) -> SpawnEssentialTaskHandle {
|
||||
SpawnEssentialTaskHandle {
|
||||
essential_failed_tx,
|
||||
inner: spawn_task_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns the given task with the given name.
|
||||
///
|
||||
/// See also [`SpawnTaskHandle::spawn`].
|
||||
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
|
||||
self.spawn_inner(name, task, TaskType::Async)
|
||||
}
|
||||
|
||||
/// Spawns the blocking task with the given name.
|
||||
///
|
||||
/// See also [`SpawnTaskHandle::spawn_blocking`].
|
||||
pub fn spawn_blocking(
|
||||
&self,
|
||||
name: &'static str,
|
||||
task: impl Future<Output = ()> + Send + 'static,
|
||||
) {
|
||||
self.spawn_inner(name, task, TaskType::Blocking)
|
||||
}
|
||||
|
||||
fn spawn_inner(
|
||||
&self,
|
||||
name: &'static str,
|
||||
task: impl Future<Output = ()> + Send + 'static,
|
||||
task_type: TaskType,
|
||||
) {
|
||||
use futures::sink::SinkExt;
|
||||
let mut essential_failed = self.essential_failed_tx.clone();
|
||||
let essential_task = std::panic::AssertUnwindSafe(task)
|
||||
.catch_unwind()
|
||||
.map(move |_| {
|
||||
log::error!("Essential task `{}` failed. Shutting down service.", name);
|
||||
let _ = essential_failed.send(());
|
||||
});
|
||||
|
||||
let _ = self.inner.spawn_inner(name, essential_task, task_type);
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper struct to manage background/async tasks in Service.
|
||||
pub struct TaskManager {
|
||||
/// A future that resolves when the service has exited, this is useful to
|
||||
|
||||
Reference in New Issue
Block a user