Remove tokio dependencies (#2935)

* Remove dependencies on tokio

* Make service not depend on tokio

* Fix service tests

* Manually poll the import queue if failed to start

* Spawn all tasks at the end

* Remove executor from TelemetryOnConnect

* Remove TaskExecutor from offchain workers

* Remove TaskExecutor from AuthoritySetup

* Remove TaskExecutor from service

* Remove tokio dependency from RPC

* Remove finality-grandpa from WASM checks

* Fix offchain tests

* Line widths

* Fix RPC tests

* Fix service tests

* Fix bad futures polling

* Address some concerns

* Better error handling

* Is it the connectivity test that's not passing? I don't know, let's try

* Revert "Is it the connectivity test that's not passing? I don't know, let's try"

This reverts commit 28bbe51f0e2e4885fe1f901e11078604604cb212.

* Fix test
This commit is contained in:
Pierre Krieger
2019-06-26 17:21:17 +02:00
committed by Bastian Köcher
parent f69c48c7b8
commit 1b73b6532a
26 changed files with 287 additions and 154 deletions
+5 -8
View File
@@ -116,12 +116,12 @@ impl<B: Block> Link<B> for WaitLink {
}
}
/// Import blocks from a binary stream.
/// Returns a future that import blocks from a binary stream.
pub fn import_blocks<F, E, R>(
mut config: FactoryFullConfiguration<F>,
exit: E,
mut input: R
) -> error::Result<()>
) -> error::Result<impl Future<Item = (), Error = ()>>
where F: ServiceFactory, E: Future<Item=(),Error=()> + Send + 'static, R: Read,
{
let client = new_client::<F>(&config)?;
@@ -175,7 +175,7 @@ pub fn import_blocks<F, E, R>(
}
let mut link = WaitLink::new();
tokio::run(futures::future::poll_fn(move || {
Ok(futures::future::poll_fn(move || {
let blocks_before = link.imported_blocks;
queue.poll_actions(&mut link);
if link.imported_blocks / 1000 != blocks_before / 1000 {
@@ -186,15 +186,12 @@ pub fn import_blocks<F, E, R>(
);
}
if link.imported_blocks >= count {
info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number);
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}));
info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number);
Ok(())
}))
}
/// Revert the chain.
+30 -12
View File
@@ -18,7 +18,6 @@
use std::{sync::Arc, net::SocketAddr, ops::Deref, ops::DerefMut};
use serde::{Serialize, de::DeserializeOwned};
use tokio::runtime::TaskExecutor;
use crate::chain_spec::ChainSpec;
use client_db;
use client::{self, Client, runtime_api};
@@ -34,7 +33,7 @@ use crate::config::Configuration;
use primitives::{Blake2Hasher, H256};
use rpc::{self, apis::system::SystemInfo};
use parking_lot::Mutex;
use futures::sync::mpsc;
use futures::{prelude::*, future::Executor, sync::mpsc};
// Type aliases.
// These exist mainly to avoid typing `<F as Factory>::Foo` all over the code.
@@ -262,7 +261,7 @@ pub trait OffchainWorker<C: Components> {
number: &FactoryBlockNumber<C::Factory>,
offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>,
pool: &Arc<TransactionPool<C::TransactionPoolApi>>,
) -> error::Result<()>;
) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>>;
}
impl<C: Components> OffchainWorker<Self> for C where
@@ -273,8 +272,8 @@ impl<C: Components> OffchainWorker<Self> for C where
number: &FactoryBlockNumber<C::Factory>,
offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>,
pool: &Arc<TransactionPool<C::TransactionPoolApi>>,
) -> error::Result<()> {
Ok(offchain.on_block_imported(number, pool))
) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>> {
Ok(Box::new(offchain.on_block_imported(number, pool)))
}
}
@@ -298,6 +297,9 @@ impl<C: Components, T> ServiceTrait<C> for T where
+ OffchainWorker<C>
{}
/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
/// A collection of types and methods to build a service on top of the substrate service.
pub trait ServiceFactory: 'static + Sized {
/// Block type.
@@ -351,10 +353,10 @@ pub trait ServiceFactory: 'static + Sized {
) -> Result<Self::SelectChain, error::Error>;
/// Build full service.
fn new_full(config: FactoryFullConfiguration<Self>, executor: TaskExecutor)
fn new_full(config: FactoryFullConfiguration<Self>)
-> Result<Self::FullService, error::Error>;
/// Build light service.
fn new_light(config: FactoryFullConfiguration<Self>, executor: TaskExecutor)
fn new_light(config: FactoryFullConfiguration<Self>)
-> Result<Self::LightService, error::Error>;
/// ImportQueue for a full client
@@ -455,12 +457,11 @@ pub struct FullComponents<Factory: ServiceFactory> {
impl<Factory: ServiceFactory> FullComponents<Factory> {
/// Create new `FullComponents`
pub fn new(
config: FactoryFullConfiguration<Factory>,
task_executor: TaskExecutor
config: FactoryFullConfiguration<Factory>
) -> Result<Self, error::Error> {
Ok(
Self {
service: Service::new(config, task_executor)?,
service: Service::new(config)?,
}
)
}
@@ -480,6 +481,15 @@ impl<Factory: ServiceFactory> DerefMut for FullComponents<Factory> {
}
}
impl<Factory: ServiceFactory> Future for FullComponents<Factory> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.service.poll()
}
}
impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type Factory = Factory;
type Executor = FullExecutor<Factory>;
@@ -555,11 +565,10 @@ impl<Factory: ServiceFactory> LightComponents<Factory> {
/// Create new `LightComponents`
pub fn new(
config: FactoryFullConfiguration<Factory>,
task_executor: TaskExecutor
) -> Result<Self, error::Error> {
Ok(
Self {
service: Service::new(config, task_executor)?,
service: Service::new(config)?,
}
)
}
@@ -573,6 +582,15 @@ impl<Factory: ServiceFactory> Deref for LightComponents<Factory> {
}
}
impl<Factory: ServiceFactory> Future for LightComponents<Factory> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.service.poll()
}
}
impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
type Factory = Factory;
type Executor = LightExecutor<Factory>;
+106 -38
View File
@@ -36,7 +36,7 @@ use client::{BlockchainEvents, backend::Backend};
use exit_future::Signal;
use futures::prelude::*;
use keystore::Store as Keystore;
use log::{info, warn, debug};
use log::{info, warn, debug, error};
use parity_codec::{Encode, Decode};
use primitives::Pair;
use runtime_primitives::generic::BlockId;
@@ -67,7 +67,7 @@ pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use network::{FinalityProofProvider, OnDemand};
#[doc(hidden)]
pub use tokio::runtime::TaskExecutor;
pub use futures::future::Executor;
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -82,6 +82,14 @@ pub struct Service<Components: components::Components> {
keystore: Keystore,
exit: ::exit_future::Exit,
signal: Option<Signal>,
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: Mutex<mpsc::UnboundedReceiver<Box<dyn Future<Item = (), Error = ()> + Send>>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Mutex<Vec<Box<dyn Future<Item = (), Error = ()> + Send>>>,
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
@@ -106,13 +114,9 @@ pub fn new_client<Factory: components::ServiceFactory>(config: &FactoryFullConfi
pub type TelemetryOnConnectNotifications = mpsc::UnboundedReceiver<()>;
/// Used to hook on telemetry connection established events.
pub struct TelemetryOnConnect<'a> {
/// Handle to a future that will resolve on exit.
pub on_exit: Box<dyn Future<Item=(), Error=()> + Send + 'static>,
pub struct TelemetryOnConnect {
/// Event stream.
pub telemetry_connection_sinks: TelemetryOnConnectNotifications,
/// Executor to which the hook is spawned.
pub executor: &'a TaskExecutor,
}
impl<Components: components::Components> Service<Components> {
@@ -126,10 +130,13 @@ impl<Components: components::Components> Service<Components> {
/// Creates a new service.
pub fn new(
mut config: FactoryFullConfiguration<Components::Factory>,
task_executor: TaskExecutor,
) -> Result<Self, error::Error> {
let (signal, exit) = ::exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
// Create client
let executor = NativeExecutor::new(config.default_heap_pages);
@@ -209,16 +216,13 @@ impl<Components: components::Components> Service<Components> {
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(Vec::new()));
task_executor.spawn(build_network_future(network_mut, network_status_sinks.clone())
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(network_mut, network_status_sinks.clone())
.map_err(|_| ())
.select(exit.clone())
.then(|_| Ok(())));
.then(|_| Ok(()))));
let offchain_workers = if config.offchain_worker {
Some(Arc::new(offchain::OffchainWorkers::new(
client.clone(),
task_executor.clone(),
)))
Some(Arc::new(offchain::OffchainWorkers::new(client.clone())))
} else {
None
};
@@ -229,6 +233,7 @@ impl<Components: components::Components> Service<Components> {
let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let events = client.import_notification_stream()
.for_each(move |notification| {
@@ -247,18 +252,19 @@ impl<Components: components::Components> Service<Components> {
}
if let (Some(txpool), Some(offchain)) = (txpool.upgrade(), offchain.as_ref().and_then(|o| o.upgrade())) {
Components::RuntimeServices::offchain_workers(
let future = Components::RuntimeServices::offchain_workers(
&number,
&offchain,
&txpool,
).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
task_executor.spawn(events);
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
@@ -304,7 +310,7 @@ impl<Components: components::Components> Service<Components> {
.select(exit.clone())
.then(|_| Ok(()));
task_executor.spawn(events);
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
@@ -326,7 +332,7 @@ impl<Components: components::Components> Service<Components> {
.select(exit.clone())
.then(|_| Ok(()));
task_executor.spawn(events);
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
// Periodically notify the telemetry.
@@ -337,7 +343,7 @@ impl<Components: components::Components> Service<Components> {
let self_pid = get_current_pid();
let (netstat_tx, netstat_rx) = mpsc::unbounded();
network_status_sinks.lock().push(netstat_tx);
task_executor.spawn(netstat_rx.for_each(move |net_status| {
let tel_task = netstat_rx.for_each(move |net_status| {
let info = client_.info();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
@@ -380,7 +386,8 @@ impl<Components: components::Components> Service<Components> {
);
Ok(())
}).select(exit.clone()).then(|_| Ok(())));
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// RPC
let system_info = rpc::apis::system::SystemInfo {
@@ -390,6 +397,19 @@ impl<Components: components::Components> Service<Components> {
properties: config.chain_spec.properties(),
};
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
struct ExecutorWithTx(mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>);
impl futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for ExecutorWithTx {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
self.0.unbounded_send(future)
.map_err(|err| {
let kind = futures::future::ExecuteErrorKind::Shutdown;
futures::future::ExecuteError::new(kind, err.into_inner())
})
}
}
let rpc = Components::RuntimeServices::start_rpc(
client.clone(),
system_rpc_tx,
@@ -398,14 +418,14 @@ impl<Components: components::Components> Service<Components> {
config.rpc_ws,
config.rpc_ws_max_connections,
config.rpc_cors.clone(),
task_executor.clone(),
Arc::new(ExecutorWithTx(to_spawn_tx.clone())),
transaction_pool.clone(),
)?;
task_executor.spawn(build_system_rpc_handler::<Components>(
let _ = to_spawn_tx.unbounded_send(Box::new(build_system_rpc_handler::<Components>(
network.clone(),
system_rpc_rx,
has_bootnodes
));
)));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
@@ -444,9 +464,9 @@ impl<Components: components::Components> Service<Components> {
});
Ok(())
});
task_executor.spawn(future
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone())
.then(|_| Ok(())));
.then(|_| Ok(()))));
telemetry
});
@@ -457,6 +477,9 @@ impl<Components: components::Components> Service<Components> {
select_chain,
transaction_pool,
signal: Some(signal),
to_spawn_tx,
to_spawn_rx: Mutex::new(to_spawn_rx),
to_poll: Mutex::new(Vec::new()),
keystore,
config,
exit,
@@ -484,9 +507,12 @@ impl<Components: components::Components> Service<Components> {
pub fn telemetry(&self) -> Option<tel::Telemetry> {
self._telemetry.as_ref().map(|t| t.clone())
}
}
impl<Components> Service<Components> where Components: components::Components {
/// Spawns a task in the background that runs the future passed as parameter.
pub fn spawn_task(&self, task: Box<dyn Future<Item = (), Error = ()> + Send>) {
let _ = self.to_spawn_tx.unbounded_send(task);
}
/// Get shared client instance.
pub fn client(&self) -> Arc<ComponentClient<Components>> {
self.client.clone()
@@ -525,6 +551,50 @@ impl<Components> Service<Components> where Components: components::Components {
}
}
impl<Components> Future for Service<Components> where Components: components::Components {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Future::poll(&mut &*self)
}
}
// Note that this implementation is totally unnecessary. It exists only because of tests. The tests
// should eventually be reworked, as it would make it possible to remove the `Mutex`es. that we
// lock here.
impl<'a, Components> Future for &'a Service<Components> where Components: components::Components {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// The user is supposed to poll only one service, so it doesn't matter if we keep this
// mutex locked.
let mut to_poll = self.to_poll.lock();
let mut to_spawn_rx = self.to_spawn_rx.lock();
while let Ok(Async::Ready(Some(task_to_spawn))) = to_spawn_rx.poll() {
let executor = tokio_executor::DefaultExecutor::current();
if let Err(err) = executor.execute(task_to_spawn) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
to_poll.push(err.into_future());
}
}
// Polling all the `to_poll` futures.
while let Some(pos) = to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) {
to_poll.remove(pos);
}
// The service future never ends.
Ok(Async::NotReady)
}
}
/// Builds a never-ending future that continuously polls the network.
///
/// The `status_sink` contain a list of senders to send a periodic network status to.
@@ -726,7 +796,7 @@ fn build_system_rpc_handler<Components: components::Components>(
/// ```
/// # use substrate_service::{
/// # construct_service_factory, Service, FullBackend, FullExecutor, LightBackend, LightExecutor,
/// # FullComponents, LightComponents, FactoryFullConfiguration, FullClient, TaskExecutor
/// # FullComponents, LightComponents, FactoryFullConfiguration, FullClient
/// # };
/// # use transaction_pool::{self, txpool::{Pool as TransactionPool}};
/// # use network::construct_simple_protocol;
@@ -775,14 +845,14 @@ fn build_system_rpc_handler<Components: components::Components>(
/// Genesis = GenesisConfig,
/// Configuration = (),
/// FullService = FullComponents<Self>
/// { |config, executor| <FullComponents<Factory>>::new(config, executor) },
/// { |config| <FullComponents<Factory>>::new(config) },
/// // Setup as Consensus Authority (if the role and key are given)
/// AuthoritySetup = {
/// |service: Self::FullService, executor: TaskExecutor, key: Option<Arc<ed25519::Pair>>| {
/// |service: Self::FullService, key: Option<Arc<ed25519::Pair>>| {
/// Ok(service)
/// }},
/// LightService = LightComponents<Self>
/// { |config, executor| <LightComponents<Factory>>::new(config, executor) },
/// { |config| <LightComponents<Factory>>::new(config) },
/// FullImportQueue = BasicQueue<Block>
/// { |_, client, _| Ok(BasicQueue::new(Arc::new(MyVerifier), client, None, None, None)) },
/// LightImportQueue = BasicQueue<Block>
@@ -893,21 +963,19 @@ macro_rules! construct_service_factory {
}
fn new_light(
config: $crate::FactoryFullConfiguration<Self>,
executor: $crate::TaskExecutor
config: $crate::FactoryFullConfiguration<Self>
) -> $crate::Result<Self::LightService, $crate::Error>
{
( $( $light_service_init )* ) (config, executor)
( $( $light_service_init )* ) (config)
}
fn new_full(
config: $crate::FactoryFullConfiguration<Self>,
executor: $crate::TaskExecutor,
config: $crate::FactoryFullConfiguration<Self>
) -> Result<Self::FullService, $crate::Error>
{
( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| {
( $( $full_service_init )* ) (config).and_then(|service| {
let key = (&service).authority_key().map(Arc::new);
($( $authority_setup )*)(service, executor, key)
($( $authority_setup )*)(service, key)
})
}
}