Update the service to std futures (#4447)

* Switch service to futures03

* Fix tests

* Fix service test and cli

* Re-add Executor trait to SpawnTaskHandle

* Fix node-service

* Update babe

* Fix browser node

* Update aura

* Revert back to tokio-executor to fix runtime panic

* Add todo item

* Fix service tests again

* Timeout test futures

* Fix tests

* nits

* Fix service test

* Remove zstd patch

* Re-add futures01 to aura and babe tests as a dev-dep

* Change failing test to tee

* Fix node

* Upgrade tokio

* fix society

* Start switching grandpa to stable futures

* Revert "Start switching grandpa to stable futures"

This reverts commit 9c1976346237637effc07c13f7d0403daf5e71cf.

* Fix utils

* Revert substrate service test

* Revert gitlab

Co-authored-by: thiolliere <gui.thiolliere@gmail.com>
This commit is contained in:
Ashley
2020-01-14 15:43:45 +01:00
committed by GitHub
parent 972be34e38
commit 3219be2508
24 changed files with 246 additions and 312 deletions
+35 -51
View File
@@ -27,12 +27,10 @@ use sc_client_api::{
use sc_client::Client;
use sc_chain_spec::{RuntimeGenesis, Extension};
use sp_consensus::import_queue::ImportQueue;
use futures::{prelude::*, sync::mpsc};
use futures03::{
compat::Compat,
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
future::{select, Either}
use futures::{
Future, FutureExt, StreamExt,
channel::mpsc,
future::{select, ready}
};
use sc_keystore::{Store as Keystore};
use log::{info, warn, error};
@@ -47,7 +45,7 @@ use sp_api::ProvideRuntimeApi;
use sc_executor::{NativeExecutor, NativeExecutionDispatch};
use std::{
io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, time::SystemTime
marker::PhantomData, sync::Arc, time::SystemTime, pin::Pin
};
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
@@ -682,7 +680,7 @@ pub trait ServiceBuilderCommand {
self,
input: impl Read + Seek + Send + 'static,
force: bool,
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
/// Performs the blocks export.
fn export_blocks(
@@ -691,7 +689,7 @@ pub trait ServiceBuilderCommand {
from: NumberFor<Self::Block>,
to: Option<NumberFor<Self::Block>>,
json: bool
) -> Box<dyn Future<Item = (), Error = Error>>;
) -> Pin<Box<dyn Future<Output = Result<(), Error>>>>;
/// Performs a revert of `blocks` blocks.
fn revert_chain(
@@ -703,7 +701,7 @@ pub trait ServiceBuilderCommand {
fn check_block(
self,
block: BlockId<Self::Block>
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TSc, TImpQu, TNetP, TExPool, TRpc>
@@ -795,7 +793,7 @@ ServiceBuilder<
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
mpsc::unbounded::<Pin<Box<dyn Future<Output = ()> + Send>>>();
// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
@@ -879,7 +877,6 @@ ServiceBuilder<
let is_validator = config.roles.is_authority();
let events = client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let txpool = txpool.upgrade();
@@ -887,8 +884,8 @@ ServiceBuilder<
let future = txpool.maintain(
&BlockId::hash(notification.hash),
&notification.retracted,
).map(|_| Ok(())).compat();
let _ = to_spawn_tx_.unbounded_send(Box::new(future));
);
let _ = to_spawn_tx_.unbounded_send(Box::pin(future));
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
@@ -897,15 +894,13 @@ ServiceBuilder<
&notification.header,
network_state_info.clone(),
is_validator
).map(|()| Ok(()));
let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future)));
);
let _ = to_spawn_tx_.unbounded_send(Box::pin(future));
}
Ok(())
})
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
ready(())
});
let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop)));
}
{
@@ -913,7 +908,6 @@ ServiceBuilder<
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
@@ -923,12 +917,10 @@ ServiceBuilder<
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()));
ready(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(events));
let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop)));
}
// Periodically notify the telemetry.
@@ -990,9 +982,9 @@ ServiceBuilder<
"disk_write_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_written).unwrap_or(0),
);
Ok(())
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
ready(())
});
let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task, exit.clone()).map(drop)));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
@@ -1003,12 +995,12 @@ ServiceBuilder<
"system.network_state";
"state" => network_state,
);
Ok(())
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
ready(())
});
let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task_2, exit.clone()).map(drop)));
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
let gen_handler = || {
use sc_rpc::{chain, state, author, system};
@@ -1068,17 +1060,14 @@ ServiceBuilder<
let rpc = start_rpc_servers(&config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
let _ = to_spawn_tx.unbounded_send(Box::pin(select(build_network_future(
config.roles,
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
)
.map_err(|_| ())
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
), exit.clone()).map(drop)));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
@@ -1099,8 +1088,6 @@ ServiceBuilder<
.map(|dur| dur.as_millis())
.unwrap_or(0);
let future = telemetry.clone()
.map(|ev| Ok::<_, ()>(ev))
.compat()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let sc_telemetry::TelemetryEvent::Connected = event;
@@ -1119,11 +1106,11 @@ ServiceBuilder<
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
Ok(())
ready(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
let _ = to_spawn_tx.unbounded_send(Box::pin(select(
future, exit.clone()
).map(drop)));
telemetry
});
@@ -1132,13 +1119,10 @@ ServiceBuilder<
let future = select(
grafana_data_source::run_server(port).boxed(),
exit.clone()
).map(|either| match either {
Either::Left((result, _)) => result.map_err(|_| ()),
Either::Right(_) => Ok(())
}).compat();
).map(drop);
let _ = to_spawn_tx.unbounded_send(Box::new(future));
}
let _ = to_spawn_tx.unbounded_send(Box::pin(future));
}
// Instrumentation
if let Some(tracing_targets) = config.tracing_targets.as_ref() {
+10 -15
View File
@@ -22,9 +22,6 @@ use crate::error::Error;
use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension};
use log::{warn, info};
use futures::{future, prelude::*};
use futures03::{
TryFutureExt as _,
};
use sp_runtime::traits::{
Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion
};
@@ -34,9 +31,7 @@ use sc_client::Client;
use sp_consensus::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue};
use sp_consensus::BlockOrigin;
use std::{
io::{Read, Write, Seek},
};
use std::{io::{Read, Write, Seek}, pin::Pin};
use sc_network::message;
@@ -68,7 +63,7 @@ impl<
self,
input: impl Read + Seek + Send + 'static,
force: bool,
) -> Box<dyn Future<Item = (), Error = Error> + Send> {
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
struct WaitLink {
imported_blocks: u64,
has_error: bool,
@@ -117,7 +112,7 @@ impl<
// queue, the `Future` re-schedules itself and returns `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block imports,
// or to stop the operation completely.
let import = futures03::future::poll_fn(move |cx| {
let import = future::poll_fn(move |cx| {
// Start by reading the number of blocks if not done so already.
let count = match count {
Some(c) => c,
@@ -205,7 +200,7 @@ impl<
return std::task::Poll::Pending;
}
});
Box::new(import.compat())
Box::pin(import)
}
fn export_blocks(
@@ -214,7 +209,7 @@ impl<
from: NumberFor<TBl>,
to: Option<NumberFor<TBl>>,
json: bool
) -> Box<dyn Future<Item = (), Error = Error>> {
) -> Pin<Box<dyn Future<Output = Result<(), Error>>>> {
let client = self.client;
let mut block = from;
@@ -233,7 +228,7 @@ impl<
// `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block exports,
// or to stop the operation completely.
let export = futures03::future::poll_fn(move |cx| {
let export = future::poll_fn(move |cx| {
if last < block {
return std::task::Poll::Ready(Err("Invalid block range specified".into()));
}
@@ -274,7 +269,7 @@ impl<
std::task::Poll::Pending
});
Box::new(export.compat())
Box::pin(export)
}
fn revert_chain(
@@ -295,7 +290,7 @@ impl<
fn check_block(
self,
block_id: BlockId<TBl>
) -> Box<dyn Future<Item = (), Error = Error> + Send> {
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
match self.client.block(&block_id) {
Ok(Some(block)) => {
let mut buf = Vec::new();
@@ -304,8 +299,8 @@ impl<
let reader = std::io::Cursor::new(buf);
self.import_blocks(reader, true)
}
Ok(None) => Box::new(future::err("Unknown block".into())),
Err(e) => Box::new(future::err(format!("Error reading block: {:?}", e).into())),
Ok(None) => Box::pin(future::err("Unknown block".into())),
Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())),
}
}
}
+85 -101
View File
@@ -32,15 +32,17 @@ use std::marker::PhantomData;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use futures::sync::mpsc;
use std::task::{Poll, Context};
use parking_lot::Mutex;
use sc_client::Client;
use exit_future::Signal;
use futures::prelude::*;
use futures03::{
future::{ready, FutureExt as _, TryFutureExt as _},
stream::{StreamExt as _, TryStreamExt as _},
use futures::{
Future, FutureExt, Stream, StreamExt, TryFutureExt,
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
task::{Spawn, SpawnExt, FutureObj, SpawnError},
};
use sc_network::{
NetworkService, NetworkState, specialization::NetworkSpecialization,
@@ -67,8 +69,6 @@ pub use sc_rpc::Metadata as RpcMetadata;
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use sc_network::{FinalityProofProvider, OnDemand, config::BoxFinalityProofRequestBuilder};
#[doc(hidden)]
pub use futures::future::Executor;
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -92,13 +92,13 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
/// A receiver for spawned essential-tasks concluding.
essential_failed_rx: mpsc::UnboundedReceiver<()>,
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Box<dyn Future<Item = (), Error = ()> + Send>>,
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
to_poll: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<sc_telemetry::Telemetry>,
@@ -109,42 +109,36 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
}
/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
sender: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
sender: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
on_exit: exit_future::Exit,
}
impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
let exit = self.on_exit.clone().map(Ok).compat();
let future = Box::new(future.select(exit).then(|_| Ok(())));
if let Err(err) = self.sender.unbounded_send(future) {
let kind = futures::future::ExecuteErrorKind::Shutdown;
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
} else {
Ok(())
}
impl Spawn for SpawnTaskHandle {
fn spawn_obj(&self, future: FutureObj<'static, ()>)
-> Result<(), SpawnError> {
let future = select(self.on_exit.clone(), future).map(drop);
self.sender.unbounded_send(Box::pin(future))
.map_err(|_| SpawnError::shutdown())
}
}
impl futures03::task::Spawn for SpawnTaskHandle {
fn spawn_obj(&self, future: futures03::task::FutureObj<'static, ()>)
-> Result<(), futures03::task::SpawnError> {
self.execute(Box::new(futures03::compat::Compat::new(future.unit_error())))
.map_err(|_| futures03::task::SpawnError::shutdown())
type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>;
impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{
self.spawn(future.compat().map(drop));
Ok(())
}
}
/// Abstraction over a Substrate service.
pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send {
pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
Spawn + Send + Unpin {
/// Type of block of this chain.
type Block: BlockT;
/// Backend storage for the client.
@@ -168,12 +162,12 @@ pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
fn telemetry(&self) -> Option<sc_telemetry::Telemetry>;
/// Spawns a task in the background that runs the future passed as parameter.
fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static);
fn spawn_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static);
/// Spawns a task in the background that runs the future passed as
/// parameter. The given task is considered essential, i.e. if it errors we
/// trigger a service exit.
fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static);
fn spawn_essential_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static);
/// Returns a handle for spawning tasks.
fn spawn_task_handle(&self) -> SpawnTaskHandle;
@@ -190,7 +184,7 @@ pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
///
/// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to
/// send back spontaneous events.
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box<dyn Future<Item = Option<String>, Error = ()> + Send>;
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>>;
/// Get shared client instance.
fn client(&self) -> Arc<sc_client::Client<Self::Backend, Self::CallExecutor, Self::Block, Self::RuntimeApi>>;
@@ -216,11 +210,11 @@ impl<TBl, TBackend, TExec, TRtApi, TSc, TNetSpec, TExPool, TOc> AbstractService
Service<TBl, Client<TBackend, TExec, TBl, TRtApi>, TSc, NetworkStatus<TBl>,
NetworkService<TBl, TNetSpec, TBl::Hash>, TExPool, TOc>
where
TBl: BlockT,
TBl: BlockT + Unpin,
TBackend: 'static + sc_client_api::backend::Backend<TBl>,
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TExPool: 'static + TransactionPool<Block = TBl>
+ TransactionPoolMaintainer<Block = TBl>,
TOc: 'static + Send + Sync,
@@ -248,25 +242,22 @@ where
self.keystore.clone()
}
fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let exit = self.on_exit().map(Ok).compat();
let task = task.select(exit).then(|_| Ok(()));
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
fn spawn_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static) {
let task = select(self.on_exit(), task).map(drop);
let _ = self.to_spawn_tx.unbounded_send(Box::pin(task));
}
fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let essential_failed = self.essential_failed_tx.clone();
fn spawn_essential_task(&self, task: impl Future<Output = ()> + Send + Unpin + 'static) {
let mut essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind()
.then(move |_| {
.map(move |_| {
error!("Essential task failed. Shutting down service.");
let _ = essential_failed.send(());
Ok(())
});
let exit = self.on_exit().map(Ok::<_, ()>).compat();
let task = essential_task.select(exit).then(|_| Ok(()));
let task = select(self.on_exit(), essential_task).map(drop);
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
let _ = self.to_spawn_tx.unbounded_send(Box::pin(task));
}
fn spawn_task_handle(&self) -> SpawnTaskHandle {
@@ -276,8 +267,12 @@ where
}
}
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box<dyn Future<Item = Option<String>, Error = ()> + Send> {
Box::new(self.rpc_handlers.handle_request(request, mem.metadata.clone()))
fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
Box::pin(
self.rpc_handlers.handle_request(request, mem.metadata.clone())
.compat()
.map(|res| res.expect("this should never fail"))
)
}
fn client(&self) -> Arc<sc_client::Client<Self::Backend, Self::CallExecutor, Self::Block, Self::RuntimeApi>> {
@@ -309,57 +304,56 @@ where
}
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
type Item = ();
type Error = Error;
type Output = Result<(), Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.essential_failed_rx.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(_)) | Err(_) => {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
match Pin::new(&mut this.essential_failed_rx).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(_) => {
// Ready(None) should not be possible since we hold a live
// sender.
return Err(Error::Other("Essential task failed.".into()));
return Poll::Ready(Err(Error::Other("Essential task failed.".into())));
}
}
while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() {
while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
if let Err(err) = executor.execute(task_to_spawn) {
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
self.to_poll.push(err.into_future());
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
}
}
// Polling all the `to_poll` futures.
while let Some(pos) = self.to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) {
let _ = self.to_poll.remove(pos);
while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) {
let _ = this.to_poll.remove(pos);
}
// The service future never ends.
Ok(Async::NotReady)
Poll::Pending
}
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Spawn for
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
fn execute(
fn spawn_obj(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
if let Err(err) = self.to_spawn_tx.unbounded_send(future) {
let kind = futures::future::ExecuteErrorKind::Shutdown;
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
} else {
Ok(())
}
future: FutureObj<'static, ()>
) -> Result<(), SpawnError> {
self.to_spawn_tx.unbounded_send(Box::pin(future))
.map_err(|_| SpawnError::shutdown())
}
}
@@ -376,29 +370,23 @@ fn build_network_future<
mut network: sc_network::NetworkWorker<B, S, H>,
client: Arc<C>,
status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<sc_rpc::system::Request<B>>,
mut rpc_rx: mpsc::UnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
) -> impl Future<Item = (), Error = ()> {
// Compatibility shim while we're transitioning to stable Futures.
// See https://github.com/paritytech/substrate/issues/3099
let mut rpc_rx = futures03::compat::Compat::new(rpc_rx.map(|v| Ok::<_, ()>(v)));
) -> impl Future<Output = ()> {
let mut imported_blocks_stream = client.import_notification_stream().fuse();
let mut finality_notification_stream = client.finality_notification_stream().fuse();
let mut imported_blocks_stream = client.import_notification_stream().fuse()
.map(|v| Ok::<_, ()>(v)).compat();
let mut finality_notification_stream = client.finality_notification_stream().fuse()
.map(|v| Ok::<_, ()>(v)).compat();
futures::future::poll_fn(move || {
futures::future::poll_fn(move |cx| {
let before_polling = Instant::now();
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() {
while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) {
network.on_block_imported(notification.hash, notification.header, Vec::new(), notification.is_new_best);
}
// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() {
while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) {
last = Some(item);
}
if let Some(notification) = last {
@@ -406,7 +394,7 @@ fn build_network_future<
}
// Poll the RPC requests and answer them.
while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() {
while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) {
match request {
sc_rpc::system::Request::Health(sender) => {
let _ = sender.send(sc_rpc::system::Health {
@@ -466,7 +454,7 @@ fn build_network_future<
}
// Interval report for the external API.
status_sinks.lock().poll(|| {
status_sinks.lock().poll(cx, || {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
@@ -481,12 +469,10 @@ fn build_network_future<
});
// Main network polling.
let mut net_poll = futures03::future::poll_fn(|cx| futures03::future::Future::poll(Pin::new(&mut network), cx))
.compat();
if let Ok(Async::Ready(())) = net_poll.poll().map_err(|err| {
if let Poll::Ready(Ok(())) = Pin::new(&mut network).poll(cx).map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
return Ok(Async::Ready(()));
return Poll::Ready(());
}
// Now some diagnostic for performances.
@@ -498,7 +484,7 @@ fn build_network_future<
polling_dur
);
Ok(Async::NotReady)
Poll::Pending
})
}
@@ -596,7 +582,7 @@ impl RpcSession {
/// messages.
///
/// The `RpcSession` must be kept alive in order to receive messages on the sender.
pub fn new(sender: mpsc::Sender<String>) -> RpcSession {
pub fn new(sender: futures01::sync::mpsc::Sender<String>) -> RpcSession {
RpcSession {
metadata: sender.into(),
}
@@ -668,7 +654,7 @@ where
let best_block_id = BlockId::hash(self.client.info().best_hash);
let import_future = self.pool.submit_one(&best_block_id, uxt);
let import_future = import_future
.then(move |import_result| {
.map(move |import_result| {
match import_result {
Ok(_) => report_handle.report_peer(who, reputation_change_good),
Err(e) => match e.into_pool_error() {
@@ -680,11 +666,9 @@ where
Err(e) => debug!("Error converting pool error: {:?}", e),
}
}
ready(Ok(()))
})
.compat();
});
if let Err(e) = self.executor.execute(Box::new(import_future)) {
if let Err(e) = self.executor.spawn(Box::new(import_future)) {
warn!("Error scheduling extrinsic import: {:?}", e);
}
}
@@ -700,7 +684,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures03::executor::block_on;
use futures::executor::block_on;
use sp_consensus::SelectChain;
use sp_runtime::traits::BlindCheckable;
use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}};
+33 -33
View File
@@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::prelude::*;
use futures::sync::mpsc;
use futures::stream::futures_unordered::FuturesUnordered;
use std::time::{Duration, Instant};
use tokio_timer::Delay;
use futures::{Stream, stream::futures_unordered::FuturesUnordered, channel::mpsc};
use std::time::Duration;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures_timer::Delay;
/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
/// period elapses, we push an element on the sender.
@@ -29,7 +29,7 @@ pub struct StatusSinks<T> {
}
struct YieldAfter<T> {
delay: tokio_timer::Delay,
delay: Delay,
interval: Duration,
sender: Option<mpsc::UnboundedSender<T>>,
}
@@ -47,7 +47,7 @@ impl<T> StatusSinks<T> {
/// The `interval` is the time period between two pushes on the sender.
pub fn push(&mut self, interval: Duration, sender: mpsc::UnboundedSender<T>) {
self.entries.push(YieldAfter {
delay: Delay::new(Instant::now() + interval),
delay: Delay::new(interval),
interval,
sender: Some(sender),
})
@@ -57,16 +57,16 @@ impl<T> StatusSinks<T> {
/// pushes what it returns to the sender.
///
/// This function doesn't return anything, but it should be treated as if it implicitly
/// returns `Ok(Async::NotReady)`. In particular, it should be called again when the task
/// returns `Poll::Pending`. In particular, it should be called again when the task
/// is waken up.
///
/// # Panic
///
/// Panics if not called within the context of a task.
pub fn poll(&mut self, mut status_grab: impl FnMut() -> T) {
pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) {
loop {
match self.entries.poll() {
Ok(Async::Ready(Some((sender, interval)))) => {
match Pin::new(&mut self.entries).poll_next(cx) {
Poll::Ready(Some((sender, interval))) => {
let status = status_grab();
if sender.unbounded_send(status).is_ok() {
self.entries.push(YieldAfter {
@@ -74,33 +74,32 @@ impl<T> StatusSinks<T> {
// waken up and the moment it is polled, the period is actually not
// `interval` but `interval + <delay>`. We ignore this problem in
// practice.
delay: Delay::new(Instant::now() + interval),
delay: Delay::new(interval),
interval,
sender: Some(sender),
});
}
}
Err(()) |
Ok(Async::Ready(None)) |
Ok(Async::NotReady) => break,
Poll::Ready(None) |
Poll::Pending => break,
}
}
}
}
impl<T> Future for YieldAfter<T> {
type Item = (mpsc::UnboundedSender<T>, Duration);
type Error = ();
impl<T> futures::Future for YieldAfter<T> {
type Output = (mpsc::UnboundedSender<T>, Duration);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(())) => {
let sender = self.sender.take()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
match Pin::new(&mut this.delay).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => {
let sender = this.sender.take()
.expect("sender is always Some unless the future is finished; qed");
Ok(Async::Ready((sender, self.interval)))
},
Err(_) => Err(()),
Poll::Ready((sender, this.interval))
}
}
}
}
@@ -109,8 +108,9 @@ impl<T> Future for YieldAfter<T> {
mod tests {
use super::StatusSinks;
use futures::prelude::*;
use futures::sync::mpsc;
use futures::channel::mpsc;
use std::time::Duration;
use std::task::Poll;
#[test]
fn works() {
@@ -125,18 +125,18 @@ mod tests {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let mut val_order = 5;
runtime.spawn(futures::future::poll_fn(move || {
status_sinks.poll(|| { val_order += 1; val_order });
Ok(Async::NotReady)
runtime.spawn(futures::future::poll_fn(move |cx| {
status_sinks.poll(cx, || { val_order += 1; val_order });
Poll::<()>::Pending
}));
let done = rx
.into_future()
.and_then(|(item, rest)| {
.then(|(item, rest)| {
assert_eq!(item, Some(6));
rest.into_future()
})
.and_then(|(item, rest)| {
.then(|(item, rest)| {
assert_eq!(item, Some(7));
rest.into_future()
})
@@ -144,6 +144,6 @@ mod tests {
assert_eq!(item, Some(8));
});
runtime.block_on(done).unwrap();
runtime.block_on(done);
}
}