Upgrade from futures-preview to futures 0.3.1, and remove futures 0.1 where currently possible (#4083)

* Migrate node and node-template

* Migrate srml

* Simple changes

* Add async-std for interval

* Fix test-runtime warning

* Small changes

* move futures01 in core/rpc to dev-deps

* Change wasm CI builds

* Switch to async-std 1.0.1

* Remove async-std dep of network

* Add modified lockfile

* Fix node cli browser build

* Remove authority-discovery async-std dep

* Add Send + Sync to interval dyn stream
This commit is contained in:
Ashley
2019-11-22 13:06:23 +01:00
committed by Gavin Wood
parent 795701608c
commit 1735683cc9
57 changed files with 240 additions and 224 deletions
+1 -3
View File
@@ -13,8 +13,7 @@ kvdb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c
derive_more = { version = "0.15.0" }
executor = { package = "substrate-executor", path = "executor" }
fnv = { version = "1.0.6" }
futures = { version = "0.1.29" }
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
hash-db = { version = "0.15.2" }
header-metadata = { package = "substrate-header-metadata", path = "header-metadata" }
hex-literal = { version = "0.2.1" }
@@ -38,4 +37,3 @@ client-db = { package = "substrate-client-db", path = "./db", features = ["kvdb-
test-client = { package = "substrate-test-runtime-client", path = "../test/utils/runtime/client" }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="b0317f649ab2c665b7987b8475878fc4d2e1f81d" }
panic-handler = { package = "substrate-panic-handler", path = "../primitives/panic-handler" }
+1 -2
View File
@@ -11,8 +11,7 @@ consensus = { package = "substrate-consensus-common", path = "../../primitives/c
derive_more = { version = "0.15.0" }
executor = { package = "substrate-executor", path = "../executor" }
fnv = { version = "1.0.6" }
futures = { version = "0.1.29" }
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1" }
hash-db = { version = "0.15.2", default-features = false }
header-metadata = { package = "substrate-header-metadata", path = "../header-metadata" }
hex-literal = { version = "0.2.1" }
+1 -1
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use futures03::channel::mpsc;
use futures::channel::mpsc;
use primitives::storage::StorageKey;
use state_machine::ExecutionStrategy;
use sr_primitives::{
+4 -4
View File
@@ -286,7 +286,7 @@ pub trait RemoteBlockchain<Block: BlockT>: Send + Sync {
#[cfg(test)]
pub mod tests {
use futures03::future::Ready;
use futures::future::Ready;
use parking_lot::Mutex;
use crate::error::Error as ClientError;
use test_primitives::{Block, Header, Extrinsic};
@@ -298,7 +298,7 @@ pub mod tests {
where
E: std::convert::From<&'static str>,
{
futures03::future::ready(Err("Not implemented on test node".into()))
futures::future::ready(Err("Not implemented on test node".into()))
}
impl Fetcher<Block> for OkCallFetcher {
@@ -321,7 +321,7 @@ pub mod tests {
}
fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
futures03::future::ready(Ok((*self.lock()).clone()))
futures::future::ready(Ok((*self.lock()).clone()))
}
fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
@@ -332,4 +332,4 @@ pub mod tests {
not_implemented_in_tests()
}
}
}
}
+10 -10
View File
@@ -22,7 +22,7 @@ use std::{
};
use fnv::{FnvHashSet, FnvHashMap};
use futures03::channel::mpsc;
use futures::channel::mpsc;
use primitives::storage::{StorageKey, StorageData};
use sr_primitives::traits::Block as BlockT;
@@ -347,7 +347,7 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), None)];
let mut recv = futures03::executor::block_on_stream(
let mut recv = futures::executor::block_on_stream(
notifications.listen(None, Some(&child_filter[..]))
);
@@ -382,13 +382,13 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let mut recv1 = futures03::executor::block_on_stream(
let mut recv1 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let mut recv2 = futures03::executor::block_on_stream(
let mut recv2 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let mut recv3 = futures03::executor::block_on_stream(
let mut recv3 = futures::executor::block_on_stream(
notifications.listen(Some(&[]), Some(&child_filter))
);
@@ -429,16 +429,16 @@ mod tests {
let mut notifications = StorageNotifications::<Block>::default();
{
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let _recv1 = futures03::executor::block_on_stream(
let _recv1 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let _recv2 = futures03::executor::block_on_stream(
let _recv2 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let _recv3 = futures03::executor::block_on_stream(
let _recv3 = futures::executor::block_on_stream(
notifications.listen(None, None)
);
let _recv4 = futures03::executor::block_on_stream(
let _recv4 = futures::executor::block_on_stream(
notifications.listen(None, Some(&child_filter))
);
assert_eq!(notifications.listeners.len(), 2);
@@ -465,7 +465,7 @@ mod tests {
// given
let mut recv = {
let mut notifications = StorageNotifications::<Block>::default();
let recv = futures03::executor::block_on_stream(notifications.listen(None, None));
let recv = futures::executor::block_on_stream(notifications.listen(None, None));
// when
let changeset = vec![];
@@ -14,8 +14,8 @@ bytes = "0.4.12"
client-api = { package = "substrate-client-api", path = "../api" }
codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
derive_more = "0.15.0"
futures-preview = "0.3.0-alpha.19"
futures-timer = "0.4"
futures = "0.3.1"
futures-timer = "2.0"
keystore = { package = "substrate-keystore", path = "../keystore" }
libp2p = { version = "0.13.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
@@ -51,9 +51,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::task::{Context, Poll};
use futures::Future;
use futures_timer::Interval;
use futures::prelude::*;
use futures::{Future, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use client_api::blockchain::HeaderBackend;
@@ -68,6 +67,8 @@ use prost::Message;
use sr_primitives::generic::BlockId;
use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi};
type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;
mod error;
/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
mod schema {
@@ -129,14 +130,14 @@ where
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node
// could restart at any point in time, one can not depend on the republishing process, thus publishing own
// external addresses should happen on an interval < 36h.
let publish_interval = Interval::new_at(
let publish_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(12 * 60 * 60),
);
// External addresses of other authorities can change at any given point in time. The interval on which to query
// for external addresses of other authorities is a trade off between efficiency and performance.
let query_interval = Interval::new_at(
let query_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(10 * 60),
);
@@ -455,6 +456,19 @@ fn hash_authority_id(id: &[u8]) -> Result<libp2p::kad::record::Key> {
.map_err(Error::HashingAuthorityId)
}
fn interval_at(start: Instant, duration: Duration) -> Interval {
let stream = futures::stream::unfold((), move |_| {
let wait_time = start.saturating_duration_since(Instant::now());
futures::future::join(
Delay::new(wait_time),
Delay::new(duration)
).map(|_| Some(((), ())))
}).map(drop);
Box::new(stream)
}
#[cfg(test)]
mod tests {
use super::*;
+1 -1
View File
@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
log = "0.4.8"
futures-preview = "0.3.0-alpha.19"
futures = "0.3.1"
codec = { package = "parity-scale-codec", version = "1.0.0" }
sr-primitives = { path = "../../primitives/sr-primitives" }
primitives = { package = "substrate-primitives", path = "../../primitives/core" }
+2 -3
View File
@@ -17,10 +17,9 @@ ansi_term = "0.12.1"
lazy_static = "1.4.0"
app_dirs = "1.2.1"
tokio = "0.1.22"
futures = "0.1.29"
futures03 = { package = "futures-preview", version = "=0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
futures01 = "0.1.29"
fdlimit = "0.1.1"
exit-future = "0.1.4"
serde_json = "1.0.41"
panic-handler = { package = "substrate-panic-handler", path = "../../primitives/panic-handler" }
client-api = { package = "substrate-client-api", path = "../api" }
+11 -9
View File
@@ -17,8 +17,7 @@
//! Console informant. Prints sync progress and block events. Runs on the calling thread.
use client_api::BlockchainEvents;
use futures::{Future, Stream};
use futures03::{StreamExt as _, TryStreamExt as _};
use futures::{StreamExt, TryStreamExt, FutureExt, future, compat::Stream01CompatExt};
use log::{info, warn};
use sr_primitives::traits::Header;
use service::AbstractService;
@@ -27,17 +26,18 @@ use std::time::Duration;
mod display;
/// Creates an informant in the form of a `Future` that must be polled regularly.
pub fn build(service: &impl AbstractService) -> impl Future<Item = (), Error = ()> {
pub fn build(service: &impl AbstractService) -> impl futures::Future<Output = ()> {
let client = service.client();
let mut display = display::InformantDisplay::new();
let display_notifications = service
.network_status(Duration::from_millis(5000))
.for_each(move |(net_status, _)| {
.compat()
.try_for_each(move |(net_status, _)| {
let info = client.info();
display.display(&info, net_status);
Ok(())
future::ok(())
});
let client = service.client();
@@ -46,7 +46,7 @@ pub fn build(service: &impl AbstractService) -> impl Future<Item = (), Error = (
Some((info.chain.best_number, info.chain.best_hash))
};
let display_block_import = client.import_notification_stream().map(|v| Ok::<_, ()>(v)).compat().for_each(move |n| {
let display_block_import = client.import_notification_stream().for_each(move |n| {
// detect and log reorganizations.
if let Some((ref last_num, ref last_hash)) = last_best {
if n.header.parent_hash() != last_hash && n.is_new_best {
@@ -74,9 +74,11 @@ pub fn build(service: &impl AbstractService) -> impl Future<Item = (), Error = (
}
info!(target: "substrate", "Imported #{} ({})", n.header.number(), n.hash);
Ok(())
future::ready(())
});
display_notifications.join(display_block_import)
.map(|((), ())| ())
future::join(
display_notifications,
display_block_import
).map(|_| ())
}
+11 -7
View File
@@ -61,8 +61,8 @@ pub use traits::{GetLogFilter, AugmentClap};
use app_dirs::{AppInfo, AppDataType};
use log::info;
use lazy_static::lazy_static;
use futures::{Async, Future};
use futures::{Future, FutureExt, TryFutureExt};
use futures01::{Async, Future as _};
use substrate_telemetry::TelemetryEndpoints;
/// default sub directory to store network config
@@ -102,7 +102,7 @@ pub struct VersionInfo {
/// Something that can be converted into an exit signal.
pub trait IntoExit {
/// Exit signal type.
type Exit: Future<Item=(),Error=()> + Send + 'static;
type Exit: Future<Output=()> + Unpin + Send + 'static;
/// Convert into exit signal.
fn into_exit(self) -> Self::Exit;
}
@@ -391,14 +391,16 @@ impl<'a> ParseAndPrepareExport<'a> {
// Note: while we would like the user to handle the exit themselves, we handle it here
// for backwards compatibility reasons.
let (exit_send, exit_recv) = std::sync::mpsc::channel();
let exit = exit.into_exit();
let exit = exit.into_exit()
.map(|_| Ok::<_, ()>(()))
.compat();
std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
let mut export_fut = builder(config)?.export_blocks(file, from.into(), to.map(Into::into), json);
let fut = futures::future::poll_fn(|| {
let fut = futures01::future::poll_fn(|| {
if exit_recv.try_recv().is_ok() {
return Ok(Async::Ready(()));
}
@@ -453,14 +455,16 @@ impl<'a> ParseAndPrepareImport<'a> {
// Note: while we would like the user to handle the exit themselves, we handle it here
// for backwards compatibility reasons.
let (exit_send, exit_recv) = std::sync::mpsc::channel();
let exit = exit.into_exit();
let exit = exit.into_exit()
.map(|_| Ok::<_, ()>(()))
.compat();
std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
let mut import_fut = builder(config)?.import_blocks(file);
let fut = futures::future::poll_fn(|| {
let fut = futures01::future::poll_fn(|| {
if exit_recv.try_recv().is_ok() {
return Ok(Async::Ready(()));
}
+2 -2
View File
@@ -14,9 +14,9 @@ client-api = { package = "substrate-client-api", path = "../../api" }
codec = { package = "parity-scale-codec", version = "1.0.0" }
consensus_common = { package = "substrate-consensus-common", path = "../../../primitives/consensus/common" }
derive_more = "0.15.0"
futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] }
futures-timer = "0.4.0"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1" }
futures-timer = "0.4.0"
inherents = { package = "substrate-inherents", path = "../../../primitives/inherents" }
keystore = { package = "substrate-keystore", path = "../../keystore" }
log = "0.4.8"
+1 -1
View File
@@ -29,7 +29,7 @@ uncles = { package = "substrate-consensus-uncles", path = "../uncles" }
slots = { package = "substrate-consensus-slots", path = "../slots" }
sr-primitives = { path = "../../../primitives/sr-primitives" }
fork-tree = { path = "../../../utils/fork-tree" }
futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1" }
futures-timer = "0.4.0"
parking_lot = "0.9.0"
+1 -1
View File
@@ -15,6 +15,6 @@ inherents = { package = "substrate-inherents", path = "../../../primitives/inher
pow-primitives = { package = "substrate-consensus-pow-primitives", path = "../../../primitives/consensus/pow" }
consensus-common = { package = "substrate-consensus-common", path = "../../../primitives/consensus/common" }
log = "0.4.8"
futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
sp-timestamp = { path = "../../../primitives/timestamp" }
derive_more = "0.15.0"
+2 -2
View File
@@ -14,8 +14,8 @@ sr-primitives = { path = "../../../primitives/sr-primitives" }
substrate-telemetry = { path = "../../telemetry" }
consensus_common = { package = "substrate-consensus-common", path = "../../../primitives/consensus/common" }
inherents = { package = "substrate-inherents", path = "../../../primitives/inherents" }
futures-preview = "0.3.0-alpha.19"
futures-timer = "0.4.0"
futures = "0.3.1"
futures-timer = "2.0"
parking_lot = "0.9.0"
log = "0.4.8"
+1 -3
View File
@@ -192,12 +192,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
remaining_duration,
).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e))),
Delay::new(remaining_duration)
.map_err(consensus_common::Error::FaultyTimer)
).map(|v| match v {
futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)),
futures::future::Either::Right((Ok(_), _)) =>
futures::future::Either::Right(_) =>
Err(consensus_common::Error::ClientImport("Timeout in the Slots proposer".into())),
futures::future::Either::Right((Err(err), _)) => Err(err),
});
let block_import_params_maker = self.block_import_params();
@@ -137,8 +137,7 @@ impl<SC: SlotCompatible + Unpin> Stream for Slots<SC> {
if let Some(ref mut inner_delay) = self.inner_delay {
match Future::poll(Pin::new(inner_delay), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(Error::FaultyTimer(err)))),
Poll::Ready(Ok(())) => {}
Poll::Ready(()) => {}
}
}
+1 -1
View File
@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
fork-tree = { path = "../../utils/fork-tree" }
futures = "0.1.29"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
log = "0.4.8"
parking_lot = "0.9.0"
tokio-executor = "0.1.8"
+1 -1
View File
@@ -15,7 +15,7 @@ parking_lot = "0.9.0"
bitflags = "1.2.0"
fnv = "1.0.6"
futures = "0.1.29"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures-timer = "0.4.0"
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
+2 -2
View File
@@ -27,7 +27,7 @@ use log::{debug, trace, error};
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use futures_timer::Interval;
use crate::utils::interval;
/// Time after we disconnect from a node before we purge its information from the cache.
const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
@@ -76,7 +76,7 @@ impl<TSubstream> DebugInfoBehaviour<TSubstream> {
ping: Ping::new(PingConfig::new()),
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Box::new(Interval::new(GARBAGE_COLLECT_INTERVAL).map(|()| Ok(())).compat()),
garbage_collect: Box::new(interval(GARBAGE_COLLECT_INTERVAL).map(|()| Ok(())).compat()),
}
}
+1
View File
@@ -177,6 +177,7 @@ mod on_demand_layer;
mod protocol;
mod service;
mod transport;
mod utils;
pub mod config;
pub mod error;
+3 -2
View File
@@ -16,6 +16,7 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use crate::legacy_proto::{LegacyProto, LegacyProtoOut};
use crate::utils::interval;
use bytes::BytesMut;
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
@@ -430,8 +431,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let behaviour = LegacyProto::new(protocol_id, versions, peerset);
let protocol = Protocol {
tick_timeout: Box::new(futures_timer::Interval::new(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
propagate_timeout: Box::new(futures_timer::Interval::new(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
tick_timeout: Box::new(interval(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
propagate_timeout: Box::new(interval(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
config,
context_data: ContextData {
peers: HashMap::new(),
+25
View File
@@ -0,0 +1,25 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::time::Duration;
use futures03::{FutureExt, Stream, StreamExt, stream::unfold};
use futures_timer::Delay;
pub fn interval(duration: Duration) -> impl Stream<Item=()> + Unpin {
unfold((), move |_| {
Delay::new(duration).map(|_| Some(((), ())))
}).map(drop)
}
+2 -2
View File
@@ -12,8 +12,8 @@ client-api = { package = "substrate-client-api", path = "../api" }
sr-api = { path = "../../primitives/sr-api" }
fnv = "1.0.6"
futures01 = { package = "futures", version = "0.1" }
futures-preview = "0.3.0-alpha.19"
futures-timer = "0.4.0"
futures = "0.3.1"
futures-timer = "2.0"
log = "0.4.8"
threadpool = "1.7"
num_cpus = "1.10"
+2 -2
View File
@@ -10,7 +10,7 @@ client-api = { package = "substrate-client-api", path = "../api" }
client = { package = "substrate-client", path = "../" }
sr-api = { path = "../../primitives/sr-api" }
codec = { package = "parity-scale-codec", version = "1.0.0" }
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
primitives = { package = "substrate-primitives", path = "../../primitives/core" }
@@ -29,7 +29,7 @@ parking_lot = { version = "0.9.0" }
[dev-dependencies]
assert_matches = "1.3.0"
futures = "0.1.29"
futures01 = { package = "futures", version = "0.1.29" }
network = { package = "substrate-network", path = "../network" }
rustc-hex = "2.0.1"
sr-io = { path = "../../primitives/sr-io" }
+1 -1
View File
@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
codec = { package = "parity-scale-codec", version = "1.0.0" }
derive_more = "0.15.0"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.3"
jsonrpc-derive = "14.0.3"
+1 -1
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use jsonrpc_core::futures::prelude::*;
use futures03::{channel::oneshot, compat::Compat};
use futures::{channel::oneshot, compat::Compat};
/// Wraps around `oneshot::Receiver` and adjusts the error type to produce an internal error if the
/// sender gets dropped.
+2 -2
View File
@@ -20,7 +20,7 @@
mod tests;
use std::{sync::Arc, convert::TryInto};
use futures03::future::{FutureExt, TryFutureExt};
use futures::future::{FutureExt, TryFutureExt};
use log::warn;
use client::Client;
@@ -30,7 +30,7 @@ use rpc::futures::{
Sink, Future,
future::result,
};
use futures03::{StreamExt as _, compat::Compat, future::ready};
use futures::{StreamExt as _, compat::Compat, future::ready};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
@@ -17,7 +17,7 @@
//! Blockchain API backend for light nodes.
use std::sync::Arc;
use futures03::{future::ready, FutureExt, TryFutureExt};
use futures::{future::ready, FutureExt, TryFutureExt};
use rpc::futures::future::{result, Future, Either};
use api::Subscriptions;
+1 -1
View File
@@ -23,7 +23,7 @@ mod chain_light;
mod tests;
use std::sync::Arc;
use futures03::{future, StreamExt as _, TryStreamExt as _};
use futures::{future, StreamExt, TryStreamExt};
use log::warn;
use rpc::{
Result as RpcResult,
+1 -1
View File
@@ -19,7 +19,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::ops::Range;
use futures03::{future, StreamExt as _, TryStreamExt as _};
use futures::{future, StreamExt as _, TryStreamExt as _};
use log::warn;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{
@@ -21,7 +21,7 @@ use std::{
collections::{HashSet, HashMap, hash_map::Entry},
};
use codec::Decode;
use futures03::{
use futures::{
future::{ready, Either},
channel::oneshot::{channel, Sender},
FutureExt, TryFutureExt,
@@ -753,7 +753,7 @@ mod tests {
#[test]
fn maybe_share_remote_request_shares_request() {
type UnreachableFuture = futures03::future::Ready<Result<u32, Error>>;
type UnreachableFuture = futures::future::Ready<Result<u32, Error>>;
let shared_requests = SimpleSubscriptions::default();
+1 -1
View File
@@ -20,7 +20,7 @@ use self::error::Error;
use std::sync::Arc;
use assert_matches::assert_matches;
use futures::stream::Stream;
use futures01::stream::Stream;
use primitives::storage::well_known_keys;
use sr_io::hashing::blake2_256;
use test_client::{
+1 -1
View File
@@ -19,7 +19,7 @@
#[cfg(test)]
mod tests;
use futures03::{channel::{mpsc, oneshot}, compat::Compat};
use futures::{channel::{mpsc, oneshot}, compat::Compat};
use api::Receiver;
use sr_primitives::traits::{self, Header as HeaderT};
use self::error::Result;
+3 -3
View File
@@ -20,7 +20,7 @@ use network::{self, PeerId};
use network::config::Roles;
use test_client::runtime::Block;
use assert_matches::assert_matches;
use futures03::{prelude::*, channel::mpsc};
use futures::{prelude::*, channel::mpsc};
use std::thread;
struct Status {
@@ -46,7 +46,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
let should_have_peers = !status.is_dev;
let (tx, rx) = mpsc::unbounded();
thread::spawn(move || {
futures03::executor::block_on(rx.for_each(move |request| {
futures::executor::block_on(rx.for_each(move |request| {
match request {
Request::Health(sender) => {
let _ = sender.send(Health {
@@ -231,4 +231,4 @@ fn system_node_roles() {
wait_receiver(api(None).system_node_roles()),
vec![NodeRole::Authority]
);
}
}
+1 -1
View File
@@ -16,7 +16,7 @@ wasmtime = [
[dependencies]
derive_more = "0.15.0"
futures = "0.1.29"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
parking_lot = "0.9.0"
lazy_static = "1.4.0"
log = "0.4.8"
+1 -1
View File
@@ -11,7 +11,7 @@ futures = "0.1.29"
log = "0.4.8"
env_logger = "0.7.0"
fdlimit = "0.1.1"
futures03 = { package = "futures-preview", version = "=0.3.0-alpha.19", features = ["compat"] }
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
service = { package = "substrate-service", path = "../../service", default-features = false }
network = { package = "substrate-network", path = "../../network" }
consensus = { package = "substrate-consensus-common", path = "../../../primitives/consensus/common" }
+1 -1
View File
@@ -21,7 +21,7 @@ use std::{
panic::UnwindSafe, result, cell::RefCell,
};
use log::{info, trace, warn};
use futures03::channel::mpsc;
use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use codec::{Encode, Decode};
use hash_db::{Hasher, Prefix};
+1 -1
View File
@@ -177,7 +177,7 @@ pub fn future_header<Block: BlockT, F: Fetcher<Block>>(
fetcher: &F,
id: BlockId<Block>,
) -> impl Future<Output = Result<Option<Block::Header>, ClientError>> {
use futures03::future::{ready, Either, FutureExt};
use futures::future::{ready, Either, FutureExt};
match blockchain.header(id) {
Ok(LocalOrRemote::Remote(request)) => Either::Left(
+2 -2
View File
@@ -9,8 +9,8 @@ edition = "2018"
bytes = "0.4.12"
parking_lot = "0.9.0"
futures01 = { package = "futures", version = "0.1" }
futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] }
futures-timer = "0.4.0"
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "2.0.0"
libp2p = { version = "0.13.0", default-features = false, features = ["libp2p-websocket"] }
log = "0.4.8"
rand = "0.7.2"
@@ -254,11 +254,7 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
if let Some(timeout) = self.timeout.as_mut() {
match Future::poll(Pin::new(timeout), cx) {
Poll::Pending => {},
Poll::Ready(Err(err)) => {
self.timeout = None;
warn!(target: "telemetry", "Connection timeout error for {} {:?}", my_addr, err);
}
Poll::Ready(Ok(_)) => {
Poll::Ready(()) => {
self.timeout = None;
return Poll::Ready(Err(ConnectionError::Timeout))
}
+1 -1
View File
@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
codec = { package = "parity-scale-codec", version = "1.0.0" }
derive_more = "0.15.0"
futures = { version = "0.3.0", features = ["thread-pool"] }
futures = { version = "0.3.1", features = ["thread-pool"] }
log = "0.4.8"
parking_lot = "0.9.0"
primitives = { package = "substrate-primitives", path = "../../primitives/core" }
@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
derive_more = "0.15.0"
futures-preview = "0.3.0-alpha.19"
futures = "0.3.1"
log = "0.4.8"
parking_lot = "0.9.0"
serde = { version = "1.0.101", features = ["derive"] }