Pass an executor through the Configuration (#4688)

* Pass an executor through the Configuration

* Make tasks_executor mandatory

* Fix tests
This commit is contained in:
Pierre Krieger
2020-01-21 13:06:15 +01:00
committed by Bastian Köcher
parent b7a63e3b77
commit 169a48c0c5
8 changed files with 48 additions and 28 deletions
+5 -1
View File
@@ -1147,7 +1147,11 @@ ServiceBuilder<
essential_failed_rx,
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
tasks_executor: if let Some(exec) = config.tasks_executor {
exec
} else {
return Err(Error::TasksExecutorRequired);
},
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
+4 -2
View File
@@ -21,7 +21,7 @@ pub use sc_client_db::{kvdb::KeyValueDB, PruningMode};
pub use sc_network::config::{ExtTransport, NetworkConfiguration, Roles};
pub use sc_executor::WasmExecutionMethod;
use std::{path::{PathBuf, Path}, net::SocketAddr, sync::Arc};
use std::{future::Future, path::{PathBuf, Path}, pin::Pin, net::SocketAddr, sync::Arc};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension, NoExtension};
use sp_core::crypto::Protected;
@@ -29,7 +29,6 @@ use target_info::Target;
use sc_telemetry::TelemetryEndpoints;
/// Service configuration.
#[derive(Clone)]
pub struct Configuration<C, G, E = NoExtension> {
/// Implementation name
pub impl_name: &'static str,
@@ -39,6 +38,8 @@ pub struct Configuration<C, G, E = NoExtension> {
pub impl_commit: &'static str,
/// Node roles.
pub roles: Roles,
/// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error.
pub tasks_executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// Extrinsic pool configuration.
pub transaction_pool: TransactionPoolOptions,
/// Network configuration.
@@ -160,6 +161,7 @@ impl<C, G, E> Configuration<C, G, E> where
config_dir: config_dir.clone(),
name: Default::default(),
roles: Roles::FULL,
tasks_executor: None,
transaction_pool: Default::default(),
network: Default::default(),
keystore: KeystoreConfig::None,
+3
View File
@@ -40,6 +40,9 @@ pub enum Error {
/// Best chain selection strategy is missing.
#[display(fmt="Best chain selection strategy (SelectChain) is not provided.")]
SelectChainRequired,
/// Tasks executor is missing.
#[display(fmt="Tasks executor hasn't been provided.")]
TasksExecutorRequired,
/// Other error.
Other(String),
}
+4 -21
View File
@@ -38,7 +38,7 @@ use parking_lot::Mutex;
use sc_client::Client;
use exit_future::Signal;
use futures::{
Future, FutureExt, Stream, StreamExt, TryFutureExt,
Future, FutureExt, Stream, StreamExt,
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
@@ -95,10 +95,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// How to spawn background tasks.
tasks_executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<sc_telemetry::Telemetry>,
@@ -322,22 +320,7 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}
while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
}
}
// Polling all the `to_poll` futures.
while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) {
let _ = this.to_poll.remove(pos);
(this.tasks_executor)(task_to_spawn);
}
// The service future never ends.