Upgrade tokio to 1.10 (#9575)

* Upgrade tokio to 1.10

* Fix test runner

* Try fix it

* Update Cargo.lock

* Review feedback

* ahhhh

* FML

* FMT

* Fix tests
This commit is contained in:
Bastian Köcher
2021-08-24 16:31:19 +02:00
committed by GitHub
parent d722c44248
commit f92e5c5e65
24 changed files with 143 additions and 378 deletions
+31 -34
View File
@@ -18,8 +18,7 @@
//! Service integration test utils.
use futures::{FutureExt as _, TryFutureExt as _};
use futures01::{Future, Poll, Stream};
use futures::{task::Poll, Future, FutureExt, TryFutureExt as _};
use log::{debug, info};
use parking_lot::Mutex;
use sc_client_api::{Backend, CallExecutor};
@@ -36,9 +35,9 @@ use sc_service::{
use sc_transaction_pool_api::TransactionPool;
use sp_blockchain::HeaderBackend;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
use std::{iter, net::Ipv4Addr, pin::Pin, sync::Arc, time::Duration};
use std::{iter, net::Ipv4Addr, pin::Pin, sync::Arc, task::Context, time::Duration};
use tempfile::TempDir;
use tokio::{prelude::FutureExt, runtime::Runtime, timer::Interval};
use tokio::{runtime::Runtime, time};
#[cfg(test)]
mod client;
@@ -57,7 +56,7 @@ struct TestNet<G, E, F, L, U> {
}
pub trait TestNetNode:
Clone + Future<Item = (), Error = sc_service::Error> + Send + 'static
Clone + Future<Output = Result<(), sc_service::Error>> + Send + 'static
{
type Block: BlockT;
type Backend: Backend<Self::Block>;
@@ -109,11 +108,10 @@ impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Clone
impl<TBl: BlockT, TBackend, TExec, TRtApi, TExPool> Future
for TestNetComponents<TBl, TBackend, TExec, TRtApi, TExPool>
{
type Item = ();
type Error = sc_service::Error;
type Output = Result<(), sc_service::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
futures::compat::Compat::new(&mut self.task_manager.lock().future()).poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.task_manager.lock().future()).poll(cx)
}
}
@@ -161,33 +159,36 @@ where
{
let full_nodes = self.full_nodes.clone();
let light_nodes = self.light_nodes.clone();
let interval = Interval::new_interval(Duration::from_millis(100))
.map_err(|_| ())
.for_each(move |_| {
let future = async move {
let mut interval = time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
let full_ready = full_nodes
.iter()
.all(|&(ref id, ref service, _, _)| full_predicate(*id, service));
if !full_ready {
return Ok(())
continue
}
let light_ready = light_nodes
.iter()
.all(|&(ref id, ref service, _)| light_predicate(*id, service));
if !light_ready {
Ok(())
} else {
Err(())
if light_ready {
return
}
})
.timeout(MAX_WAIT_TIME);
}
};
match self.runtime.block_on(interval) {
Ok(()) => unreachable!("interval always fails; qed"),
Err(ref err) if err.is_inner() => (),
Err(_) => panic!("Waited for too long"),
if self
.runtime
.block_on(async move { time::timeout(MAX_WAIT_TIME, future).await })
.is_err()
{
panic!("Waited for too long");
}
}
}
@@ -306,11 +307,11 @@ where
light: impl Iterator<Item = impl FnOnce(Configuration) -> Result<L, Error>>,
authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>,
) {
let executor = self.runtime.executor();
let handle = self.runtime.handle().clone();
let task_executor: TaskExecutor = {
let executor = executor.clone();
let executor = handle.clone();
(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| {
executor.spawn(fut.unit_error().compat());
executor.spawn(fut.unit_error());
async {}
})
.into()
@@ -330,7 +331,7 @@ where
let (service, user_data) =
authority(node_config).expect("Error creating test node service");
executor.spawn(service.clone().map_err(|_| ()));
handle.spawn(service.clone().map_err(|_| ()));
let addr = addr
.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into()));
self.authority_nodes.push((self.nodes, service, user_data, addr));
@@ -350,7 +351,7 @@ where
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let (service, user_data) = full(node_config).expect("Error creating test node service");
executor.spawn(service.clone().map_err(|_| ()));
handle.spawn(service.clone().map_err(|_| ()));
let addr = addr
.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into()));
self.full_nodes.push((self.nodes, service, user_data, addr));
@@ -370,7 +371,7 @@ where
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = light(node_config).expect("Error creating test node service");
executor.spawn(service.clone().map_err(|_| ()));
handle.spawn(service.clone().map_err(|_| ()));
let addr = addr
.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into()));
self.light_nodes.push((self.nodes, service, addr));
@@ -406,7 +407,7 @@ pub fn connectivity<G, E, Fb, F, Lb, L>(
{
let temp = tempdir_with_prefix("substrate-connectivity-test");
let runtime = {
{
let mut network = TestNet::new(
&temp,
spec.clone(),
@@ -444,12 +445,8 @@ pub fn connectivity<G, E, Fb, F, Lb, L>(
connected == expected_light_connections
},
);
network.runtime
};
runtime.shutdown_now().wait().expect("Error shutting down runtime");
temp.close().expect("Error removing temp dir");
}
{