diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 4e3f5e1630..9418f373fa 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -954,6 +954,20 @@ name = "futures" version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-channel-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-core-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-cpupool" version = "0.1.8" @@ -963,6 +977,58 @@ dependencies = [ "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-executor-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-io-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-sink-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-util-preview" +version = "0.3.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "gcc" version = "0.3.55" @@ -2703,6 +2769,11 @@ name = "percent-encoding" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pin-utils" +version = "0.1.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "pkg-config" version = "0.3.14" @@ -4125,6 +4196,7 @@ dependencies = [ "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4152,7 +4224,7 @@ version = "2.0.0" dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", @@ -4206,6 +4278,7 @@ version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4247,6 +4320,7 @@ version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "merlin 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4383,6 +4457,7 @@ dependencies = [ "finality-grandpa 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4463,6 +4538,7 @@ dependencies = [ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4585,6 +4661,7 @@ dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 12.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client 12.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-derive 12.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4647,6 +4724,7 @@ dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "node-executor 2.0.0", @@ -4748,7 +4826,7 @@ dependencies = [ name = "substrate-test-client" version = "2.0.0" dependencies = [ - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", @@ -5933,7 +6011,14 @@ dependencies = [ "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "45dc39533a6cae6da2b56da48edae506bb767ec07370f86f70fc062e9d435869" +"checksum futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "21c71ed547606de08e9ae744bb3c6d80f5627527ef31ecf2a7210d0e67bc8fae" +"checksum futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4b141ccf9b7601ef987f36f1c0d9522f76df3bba1cf2e63bfacccc044c4558f5" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum futures-executor-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "87ba260fe51080ba37f063ad5b0732c4ff1f737ea18dcb67833d282cdc2c6f14" +"checksum futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "082e402605fcb8b1ae1e5ba7d7fdfd3e31ef510e2a8367dd92927bb41ae41b3a" +"checksum futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "bf25f91c8a9a1f64c451e91b43ba269ed359b9f52d35ed4b3ce3f9c842435867" +"checksum futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4309a25a1069a1f3c10647b227b9afe6722b67a030d3f00a9cbdc171fc038de4" +"checksum futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "af8198c48b222f02326940ce2b3aa9e6e91a32886eeaad7ca3b8e4c70daa3f4e" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "fceb69994e330afed50c93524be68c42fa898c2d9fd4ee8da03bd7363acd26f2" @@ -6082,6 +6167,7 @@ dependencies = [ "checksum pbkdf2 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "006c038a43a45995a9670da19e67600114740e8511d4333bf97a56e66a7542d9" "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" "checksum pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a029430f0d744bc3d15dd474d591bed2402b645d024583082b9f63bb936dac6" diff --git a/substrate/core/cli/Cargo.toml b/substrate/core/cli/Cargo.toml index 6ca50ba5f8..bf583a3b7d 100644 --- a/substrate/core/cli/Cargo.toml +++ b/substrate/core/cli/Cargo.toml @@ -18,6 +18,7 @@ lazy_static = "1.3" app_dirs = "1.2" tokio = "0.1.7" futures = "0.1.17" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } fdlimit = "0.1" exit-future = "0.1" serde_json = "1.0" diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index f89f8b23f5..d6bbf4831d 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -20,6 +20,7 @@ use ansi_term::Colour; use std::fmt; use std::time; use futures::{Future, Stream}; +use futures03::{StreamExt as _, TryStreamExt as _}; use service::{Service, Components}; use tokio::runtime::TaskExecutor; use network::SyncState; @@ -81,7 +82,7 @@ where C: Components { Some((info.chain.best_number, info.chain.best_hash)) }; - let display_block_import = client.import_notification_stream().for_each(move |n| { + let display_block_import = client.import_notification_stream().map(|v| Ok::<_, ()>(v)).compat().for_each(move |n| { // detect and log reorganizations. if let Some((ref last_num, ref last_hash)) = last { if n.header.parent_hash() != last_hash { diff --git a/substrate/core/client/Cargo.toml b/substrate/core/client/Cargo.toml index 444c76ea9c..29951b5c48 100644 --- a/substrate/core/client/Cargo.toml +++ b/substrate/core/client/Cargo.toml @@ -10,7 +10,7 @@ fnv = { version = "1.0", optional = true } log = { version = "0.4", optional = true } parking_lot = { version = "0.8.0", optional = true } hex = { package = "hex-literal", version = "0.1", optional = true } -futures = { version = "0.1.17", optional = true } +futures-preview = { version = "0.3.0-alpha.17", optional = true } consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true } executor = { package = "substrate-executor", path = "../executor", optional = true } state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true } @@ -47,7 +47,7 @@ std = [ "fnv", "log", "hex", - "futures", + "futures-preview", "executor", "state-machine", "keyring", diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 8389c95ea8..f2802e75fc 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -21,7 +21,7 @@ use std::{ panic::UnwindSafe, result, cell::RefCell, rc::Rc, }; use crate::error::Error; -use futures::sync::mpsc; +use futures::channel::mpsc; use parking_lot::{Mutex, RwLock}; use primitives::NativeOrEncoded; use runtime_primitives::{ diff --git a/substrate/core/client/src/lib.rs b/substrate/core/client/src/lib.rs index 8062fae500..67cfdd4a64 100644 --- a/substrate/core/client/src/lib.rs +++ b/substrate/core/client/src/lib.rs @@ -58,7 +58,7 @@ pub use crate::client::{ new_with_backend, new_in_mem, BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents, - BlockImportNotification, Client, ClientInfo, ExecutionStrategies, + BlockImportNotification, Client, ClientInfo, ExecutionStrategies, FinalityNotification, LongestChain, }; #[cfg(feature = "std")] diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs index 96c0c07f67..6f6bde2418 100644 --- a/substrate/core/client/src/light/backend.rs +++ b/substrate/core/client/src/light/backend.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; -use futures::{Future, IntoFuture}; use parking_lot::{RwLock, Mutex}; use runtime_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay}; @@ -359,14 +358,15 @@ where *self.cached_header.write() = Some(cached_header); } - self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)? - .remote_read(RemoteReadRequest { - block: self.block, - header: header.expect("if block above guarantees that header is_some(); qed"), - key: key.to_vec(), - retry_count: None, - }) - .into_future().wait() + futures::executor::block_on( + self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)? + .remote_read(RemoteReadRequest { + block: self.block, + header: header.expect("if block above guarantees that header is_some(); qed"), + key: key.to_vec(), + retry_count: None, + }) + ) } fn child_storage(&self, _storage_key: &[u8], _key: &[u8]) -> ClientResult>> { diff --git a/substrate/core/client/src/light/blockchain.rs b/substrate/core/client/src/light/blockchain.rs index e3d9c55a6a..6bd4c787d5 100644 --- a/substrate/core/client/src/light/blockchain.rs +++ b/substrate/core/client/src/light/blockchain.rs @@ -18,7 +18,6 @@ //! blocks. CHT roots are stored for headers of ancient blocks. use std::{sync::{Weak, Arc}, collections::HashMap}; -use futures::{Future, IntoFuture}; use parking_lot::Mutex; use runtime_primitives::{Justification, generic::BlockId}; @@ -122,14 +121,15 @@ impl BlockchainHeaderBackend for Blockchain where Bloc return Ok(None); } - self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)? - .remote_header(RemoteHeaderRequest { - cht_root: self.storage.header_cht_root(cht::size(), number)?, - block: number, - retry_count: None, + futures::executor::block_on( + self.fetcher().upgrade() + .ok_or(ClientError::NotAvailableOnLightClient)? + .remote_header(RemoteHeaderRequest { + cht_root: self.storage.header_cht_root(cht::size(), number)?, + block: number, + retry_count: None, }) - .into_future().wait() - .map(Some) + ).map(Some) } } } @@ -158,13 +158,13 @@ impl BlockchainBackend for Blockchain where Block: Blo None => return Ok(None), }; - self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)? - .remote_body(RemoteBodyRequest { - header, - retry_count: None, - }) - .into_future().wait() - .map(Some) + futures::executor::block_on( + self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)? + .remote_body(RemoteBodyRequest { + header, + retry_count: None, + }) + ).map(Some) } fn justification(&self, _id: BlockId) -> ClientResult> { diff --git a/substrate/core/client/src/light/call_executor.rs b/substrate/core/client/src/light/call_executor.rs index 3d77492860..faa7c10def 100644 --- a/substrate/core/client/src/light/call_executor.rs +++ b/substrate/core/client/src/light/call_executor.rs @@ -21,7 +21,6 @@ use std::{ collections::HashSet, sync::Arc, panic::UnwindSafe, result, marker::PhantomData, cell::RefCell, rc::Rc, }; -use futures::{IntoFuture, Future}; use parity_codec::{Encode, Decode}; use primitives::{offchain, H256, Blake2Hasher, convert_hash, NativeOrEncoded}; @@ -100,13 +99,13 @@ where let block_hash = self.blockchain.expect_block_hash_from_id(id)?; let block_header = self.blockchain.expect_header(id.clone())?; - self.fetcher.remote_call(RemoteCallRequest { + futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest { block: block_hash, header: block_header, method: method.into(), call_data: call_data.to_vec(), retry_count: None, - }).into_future().wait() + })) } fn contextual_call< diff --git a/substrate/core/client/src/light/fetcher.rs b/substrate/core/client/src/light/fetcher.rs index c77ebcd0fd..dd718001cd 100644 --- a/substrate/core/client/src/light/fetcher.rs +++ b/substrate/core/client/src/light/fetcher.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::collections::BTreeMap; use std::marker::PhantomData; -use futures::IntoFuture; +use std::future::Future; use hash_db::{HashDB, Hasher}; use parity_codec::{Decode, Encode}; @@ -141,15 +141,15 @@ pub struct RemoteBodyRequest { /// is correct (see FetchedDataChecker) and return already checked data. pub trait Fetcher: Send + Sync { /// Remote header future. - type RemoteHeaderResult: IntoFuture; + type RemoteHeaderResult: Future>; /// Remote storage read future. - type RemoteReadResult: IntoFuture>, Error = ClientError>; + type RemoteReadResult: Future>, ClientError>>; /// Remote call result future. - type RemoteCallResult: IntoFuture, Error = ClientError>; + type RemoteCallResult: Future, ClientError>>; /// Remote changes result future. - type RemoteChangesResult: IntoFuture, u32)>, Error = ClientError>; + type RemoteChangesResult: Future, u32)>, ClientError>>; /// Remote block body result future. - type RemoteBodyResult: IntoFuture, Error = ClientError>; + type RemoteBodyResult: Future, ClientError>>; /// Fetch remote header. fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult; @@ -484,7 +484,7 @@ impl<'a, H, Number, Hash> ChangesTrieRootsStorage for RootsStorage<'a #[cfg(test)] pub mod tests { - use futures::future::{ok, err, FutureResult}; + use futures::future::Ready; use parking_lot::Mutex; use parity_codec::Decode; use crate::client::tests::prepare_client_with_key_changes; @@ -508,19 +508,19 @@ pub mod tests { pub type OkCallFetcher = Mutex>; - fn not_implemented_in_tests() -> FutureResult + fn not_implemented_in_tests() -> Ready> where E: std::convert::From<&'static str>, { - err("Not implemented on test node".into()) + futures::future::ready(Err("Not implemented on test node".into())) } impl Fetcher for OkCallFetcher { - type RemoteHeaderResult = FutureResult; - type RemoteReadResult = FutureResult>, ClientError>; - type RemoteCallResult = FutureResult, ClientError>; - type RemoteChangesResult = FutureResult, u32)>, ClientError>; - type RemoteBodyResult = FutureResult, ClientError>; + type RemoteHeaderResult = Ready>; + type RemoteReadResult = Ready>, ClientError>>; + type RemoteCallResult = Ready, ClientError>>; + type RemoteChangesResult = Ready, u32)>, ClientError>>; + type RemoteBodyResult = Ready, ClientError>>; fn remote_header(&self, _request: RemoteHeaderRequest
) -> Self::RemoteHeaderResult { not_implemented_in_tests() @@ -535,7 +535,7 @@ pub mod tests { } fn remote_call(&self, _request: RemoteCallRequest
) -> Self::RemoteCallResult { - ok((*self.lock()).clone()) + futures::future::ready(Ok((*self.lock()).clone())) } fn remote_changes(&self, _request: RemoteChangesRequest
) -> Self::RemoteChangesResult { diff --git a/substrate/core/client/src/notifications.rs b/substrate/core/client/src/notifications.rs index 931a40f20d..bfd97df95c 100644 --- a/substrate/core/client/src/notifications.rs +++ b/substrate/core/client/src/notifications.rs @@ -22,7 +22,7 @@ use std::{ }; use fnv::{FnvHashSet, FnvHashMap}; -use futures::sync::mpsc; +use futures::channel::mpsc; use primitives::storage::{StorageKey, StorageData}; use runtime_primitives::traits::Block as BlockT; @@ -309,7 +309,6 @@ impl StorageNotifications { mod tests { use runtime_primitives::testing::{H256 as Hash, Block as RawBlock, ExtrinsicWrapper}; use super::*; - use futures::Stream; use std::iter::{empty, Empty}; type TestChangeSet = ( @@ -348,7 +347,9 @@ mod tests { // given let mut notifications = StorageNotifications::::default(); let child_filter = [(StorageKey(vec![4]), None)]; - let mut recv = notifications.listen(None, Some(&child_filter[..])).wait(); + let mut recv = futures::executor::block_on_stream( + notifications.listen(None, Some(&child_filter[..])) + ); // when let changeset = vec![ @@ -367,13 +368,13 @@ mod tests { ); // then - assert_eq!(recv.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ + assert_eq!(recv.next().unwrap(), (Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![2]), Some(StorageData(vec![3]))), (StorageKey(vec![3]), None), ], vec![(StorageKey(vec![4]), vec![ (StorageKey(vec![5]), Some(StorageData(vec![4]))), (StorageKey(vec![6]), None), - ])]).into()))); + ])]).into())); } #[test] @@ -381,9 +382,15 @@ mod tests { // given let mut notifications = StorageNotifications::::default(); let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait(); - let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait(); - let mut recv3 = notifications.listen(Some(&[]), Some(&child_filter)).wait(); + let mut recv1 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![1])]), None) + ); + let mut recv2 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![2])]), None) + ); + let mut recv3 = futures::executor::block_on_stream( + notifications.listen(Some(&[]), Some(&child_filter)) + ); // when let changeset = vec![ @@ -403,16 +410,16 @@ mod tests { ); // then - assert_eq!(recv1.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ + assert_eq!(recv1.next().unwrap(), (Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![1]), None), - ], vec![]).into()))); - assert_eq!(recv2.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![ + ], vec![]).into())); + assert_eq!(recv2.next().unwrap(), (Hash::from_low_u64_be(1), (vec![ (StorageKey(vec![2]), Some(StorageData(vec![3]))), - ], vec![]).into()))); - assert_eq!(recv3.next().unwrap(), Ok((Hash::from_low_u64_be(1), (vec![], + ], vec![]).into())); + assert_eq!(recv3.next().unwrap(), (Hash::from_low_u64_be(1), (vec![], vec![ (StorageKey(vec![4]), vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))]), - ]).into()))); + ]).into())); } @@ -422,10 +429,18 @@ mod tests { let mut notifications = StorageNotifications::::default(); { let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])]), None).wait(); - let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])]), None).wait(); - let _recv3 = notifications.listen(None, None).wait(); - let _recv4 = notifications.listen(None, Some(&child_filter)).wait(); + let _recv1 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![1])]), None) + ); + let _recv2 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![2])]), None) + ); + let _recv3 = futures::executor::block_on_stream( + notifications.listen(None, None) + ); + let _recv4 = futures::executor::block_on_stream( + notifications.listen(None, Some(&child_filter)) + ); assert_eq!(notifications.listeners.len(), 2); assert_eq!(notifications.wildcard_listeners.len(), 2); assert_eq!(notifications.child_listeners.len(), 1); @@ -450,7 +465,7 @@ mod tests { // given let mut recv = { let mut notifications = StorageNotifications::::default(); - let recv = notifications.listen(None, None).wait(); + let recv = futures::executor::block_on_stream(notifications.listen(None, None)); // when let changeset = vec![]; diff --git a/substrate/core/consensus/aura/Cargo.toml b/substrate/core/consensus/aura/Cargo.toml index 6154a991aa..03ddf79be3 100644 --- a/substrate/core/consensus/aura/Cargo.toml +++ b/substrate/core/consensus/aura/Cargo.toml @@ -25,6 +25,7 @@ parking_lot = "0.8.0" log = "0.4" [dev-dependencies] +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } keyring = { package = "substrate-keyring", path = "../../keyring" } substrate-executor = { path = "../../executor" } network = { package = "substrate-network", path = "../../network", features = ["test-helpers"]} diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index c1a0a6381c..94b5f97ffc 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -709,6 +709,7 @@ pub fn import_queue( mod tests { use super::*; use futures::{Async, stream::Stream as _}; + use futures03::{StreamExt as _, TryStreamExt as _}; use consensus_common::NoNetwork as DummyOracle; use network::test::*; use network::test::{Block as TestBlock, PeersClient, PeersFullClient}; @@ -838,6 +839,7 @@ mod tests { let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) .for_each(move |_| Ok(())) ); diff --git a/substrate/core/consensus/babe/Cargo.toml b/substrate/core/consensus/babe/Cargo.toml index 4954a7c552..29c706dd13 100644 --- a/substrate/core/consensus/babe/Cargo.toml +++ b/substrate/core/consensus/babe/Cargo.toml @@ -28,6 +28,7 @@ rand = "0.6.5" merlin = "1.0.3" [dev-dependencies] +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } keyring = { package = "substrate-keyring", path = "../../keyring" } substrate-executor = { path = "../../executor" } network = { package = "substrate-network", path = "../../network", features = ["test-helpers"]} diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs index 616426021f..454b356f7c 100644 --- a/substrate/core/consensus/babe/src/lib.rs +++ b/substrate/core/consensus/babe/src/lib.rs @@ -878,6 +878,7 @@ mod tests { use client::BlockchainEvents; use test_client; use futures::{Async, stream::Stream as _}; + use futures03::{StreamExt as _, TryStreamExt as _}; use log::debug; use std::time::Duration; type Item = generic::DigestItem; @@ -1005,6 +1006,7 @@ mod tests { let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) .for_each(move |_| Ok(())) ); diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml index 7aeefa53cb..3cbb56df81 100644 --- a/substrate/core/finality-grandpa/Cargo.toml +++ b/substrate/core/finality-grandpa/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] fork-tree = { path = "../../core/utils/fork-tree" } futures = "0.1" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } log = "0.4" parking_lot = "0.8.0" tokio-executor = "0.1.7" diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 9afb4d1bb1..0a727df9a2 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -21,6 +21,7 @@ use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, Peer use network::test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; +use futures03::{StreamExt as _, TryStreamExt as _}; use tokio::runtime::current_thread; use keyring::ed25519::{Keyring as AuthorityKeyring}; use client::{ @@ -385,6 +386,7 @@ fn run_to_completion_with( wait_for.push( Box::new( client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(move |n| { let mut highest_finalized = highest_finalized.write(); if *n.header.number() > *highest_finalized { @@ -495,6 +497,7 @@ fn finalize_3_voters_1_full_observer() { }; finality_notifications.push( client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &20)) .for_each(move |_| Ok(())) ); @@ -585,6 +588,7 @@ fn transition_3_voters_twice_1_full_observer() { // wait for blocks to be finalized before generating new ones let block_production = client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &30)) .for_each(move |n| { match n.header.number() { @@ -652,6 +656,7 @@ fn transition_3_voters_twice_1_full_observer() { finality_notifications.push( client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &30)) .for_each(move |_| Ok(())) .map(move |()| { @@ -1275,6 +1280,7 @@ fn finalize_3_voters_1_light_observer() { let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed"); let finality_notifications = net.lock().peer(3).client().finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &20)) .collect(); @@ -1436,6 +1442,7 @@ fn voter_catches_up_to_latest_round_when_behind() { finality_notifications.push( client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &50)) .for_each(move |_| Ok(())) ); @@ -1471,6 +1478,7 @@ fn voter_catches_up_to_latest_round_when_behind() { let set_state = link.persistent_data.set_state.clone(); let wait = client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .take_while(|n| Ok(n.header.number() < &50)) .collect() .map(|_| set_state); diff --git a/substrate/core/finality-grandpa/src/until_imported.rs b/substrate/core/finality-grandpa/src/until_imported.rs index d10c2ac8e7..5575a0d2c6 100644 --- a/substrate/core/finality-grandpa/src/until_imported.rs +++ b/substrate/core/finality-grandpa/src/until_imported.rs @@ -23,9 +23,10 @@ use super::{BlockStatus, CommunicationIn, Error, SignedMessage}; use log::{debug, warn}; -use client::ImportNotifications; +use client::{BlockImportNotification, ImportNotifications}; use futures::prelude::*; use futures::stream::Fuse; +use futures03::{StreamExt as _, TryStreamExt as _}; use grandpa::voter; use parking_lot::Mutex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; @@ -64,7 +65,7 @@ pub(crate) trait BlockUntilImported: Sized { /// Buffering imported messages until blocks with given hashes are imported. pub(crate) struct UntilImported> { - import_notifications: Fuse>, + import_notifications: Fuse, Error = ()> + Send>>, status_check: Status, inner: Fuse, ready: VecDeque, @@ -91,7 +92,10 @@ impl UntilImported let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL); UntilImported { - import_notifications: import_notifications.fuse(), + import_notifications: { + let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat(); + Box::new(stream) as Box + Send> + }.fuse(), status_check, inner: stream.fuse(), ready: VecDeque::new(), @@ -194,7 +198,6 @@ impl Stream for UntilImported if self.import_notifications.is_done() && self.inner.is_done() { Ok(Async::Ready(None)) } else { - Ok(Async::NotReady) } } @@ -435,7 +438,7 @@ mod tests { use consensus_common::BlockOrigin; use client::BlockImportNotification; use futures::future::Either; - use futures::sync::mpsc; + use futures03::channel::mpsc; use grandpa::Precommit; #[derive(Clone)] @@ -523,7 +526,7 @@ mod tests { // enact all dependencies before importing the message enact_dependencies(&chain_state); - let (global_tx, global_rx) = mpsc::unbounded(); + let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, @@ -548,7 +551,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = mpsc::unbounded(); + let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index afddb00dea..446e0e0118 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -14,6 +14,7 @@ parking_lot = "0.8.0" bitflags = "1.0" fnv = "1.0" futures = "0.1.17" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } linked-hash-map = "0.5" linked_hash_set = "0.1.3" lru-cache = "0.1.1" diff --git a/substrate/core/network/src/on_demand_layer.rs b/substrate/core/network/src/on_demand_layer.rs index 86b3d6b7f4..17a70bbe0d 100644 --- a/substrate/core/network/src/on_demand_layer.rs +++ b/substrate/core/network/src/on_demand_layer.rs @@ -19,6 +19,7 @@ use crate::protocol::on_demand::RequestData; use std::sync::Arc; use futures::{prelude::*, sync::mpsc, sync::oneshot}; +use futures03::compat::{Compat01As03, Future01CompatExt as _}; use parking_lot::Mutex; use client::error::Error as ClientError; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, @@ -82,22 +83,22 @@ impl Fetcher for OnDemand where B: BlockT, B::Header: HeaderT, { - type RemoteHeaderResult = RemoteResponse; - type RemoteReadResult = RemoteResponse>>; - type RemoteCallResult = RemoteResponse>; - type RemoteChangesResult = RemoteResponse, u32)>>; - type RemoteBodyResult = RemoteResponse>; + type RemoteHeaderResult = Compat01As03>; + type RemoteReadResult = Compat01As03>>>; + type RemoteCallResult = Compat01As03>>; + type RemoteChangesResult = Compat01As03, u32)>>>; + type RemoteBodyResult = Compat01As03>>; fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteHeader(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteRead(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } fn remote_read_child( @@ -106,25 +107,25 @@ impl Fetcher for OnDemand where ) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteReadChild(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteCall(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } fn remote_changes(&self, request: RemoteChangesRequest) -> Self::RemoteChangesResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteChanges(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteBody(request, sender)); - RemoteResponse { receiver } + RemoteResponse { receiver }.compat() } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 6ccedf7ef4..59a3680d40 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use crate::config::build_multiaddr; use log::trace; use crate::chain::FinalityProofProvider; -use client::{self, ClientInfo, BlockchainEvents, ImportNotifications, FinalityNotifications}; +use client::{self, ClientInfo, BlockchainEvents, BlockImportNotification, FinalityNotifications, FinalityNotification}; use client::{in_mem::Backend as InMemoryBackend, error::Result as ClientResult}; use client::block_builder::BlockBuilder; use client::backend::AuxStore; @@ -40,6 +40,7 @@ use consensus::block_import::{BlockImport, ImportResult}; use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as CacheKeyId}}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use futures::prelude::*; +use futures03::{StreamExt as _, TryStreamExt as _}; use crate::{NetworkWorker, NetworkService, config::ProtocolId}; use crate::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder}; use libp2p::PeerId; @@ -216,8 +217,8 @@ pub struct Peer> { /// instead of going through the import queue. block_import: Box>, network: NetworkWorker::Hash>, - imported_blocks_stream: futures::stream::Fuse>, - finality_notification_stream: futures::stream::Fuse>, + imported_blocks_stream: Box, Error = ()> + Send>, + finality_notification_stream: Box, Error = ()> + Send>, } impl> Peer { @@ -482,8 +483,10 @@ pub trait TestNetFactory: Sized { peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); } - let imported_blocks_stream = client.import_notification_stream().fuse(); - let finality_notification_stream = client.finality_notification_stream().fuse(); + let imported_blocks_stream = Box::new(client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat().fuse()); + let finality_notification_stream = Box::new(client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat().fuse()); peers.push(Peer { data, @@ -539,8 +542,10 @@ pub trait TestNetFactory: Sized { peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); } - let imported_blocks_stream = client.import_notification_stream().fuse(); - let finality_notification_stream = client.finality_notification_stream().fuse(); + let imported_blocks_stream = Box::new(client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat().fuse()); + let finality_notification_stream = Box::new(client.finality_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat().fuse()); peers.push(Peer { data, diff --git a/substrate/core/rpc/Cargo.toml b/substrate/core/rpc/Cargo.toml index 7833f17f6b..181cbdfd8e 100644 --- a/substrate/core/rpc/Cargo.toml +++ b/substrate/core/rpc/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" futures = "0.1" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } jsonrpc-core = "12.0.0" jsonrpc-core-client = "12.0.0" jsonrpc-pubsub = "12.0.0" diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs index 3594ed48e3..d1d476c3ab 100644 --- a/substrate/core/rpc/src/chain/mod.rs +++ b/substrate/core/rpc/src/chain/mod.rs @@ -23,6 +23,7 @@ pub mod number; mod tests; use std::sync::Arc; +use futures03::{future, StreamExt as _, TryStreamExt as _}; use client::{self, Client, BlockchainEvents}; use crate::rpc::Result as RpcResult; @@ -203,8 +204,9 @@ impl ChainApi, Block::Hash, Block::Header, Sig subscriber, || self.block_hash(None.into()), || self.client.import_notification_stream() - .filter(|notification| notification.is_new_best) - .map(|notification| notification.header), + .filter(|notification| future::ready(notification.is_new_best)) + .map(|notification| Ok::<_, ()>(notification.header)) + .compat(), ) } @@ -217,7 +219,8 @@ impl ChainApi, Block::Hash, Block::Header, Sig subscriber, || Ok(Some(self.client.info().chain.finalized_hash)), || self.client.finality_notification_stream() - .map(|notification| notification.header), + .map(|notification| Ok::<_, ()>(notification.header)) + .compat(), ) } diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs index 0b3b93885e..40ee94fdb2 100644 --- a/substrate/core/rpc/src/state/mod.rs +++ b/substrate/core/rpc/src/state/mod.rs @@ -26,6 +26,7 @@ use std::{ ops::Range, sync::Arc, }; +use futures03::{future, StreamExt as _, TryStreamExt as _}; use client::{self, Client, CallExecutor, BlockchainEvents, runtime_api::Metadata}; use crate::rpc::Result as RpcResult; @@ -484,14 +485,14 @@ impl StateApi for State where self.subscriptions.add(subscriber, |sink| { let stream = stream - .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) - .map(|(block, changes)| Ok(StorageChangeSet { + .map(|(block, changes)| Ok::<_, ()>(Ok(StorageChangeSet { block, changes: changes.iter() .filter_map(|(o_sk, k, v)| if o_sk.is_none() { Some((k.clone(),v.cloned())) } else { None }).collect(), - })); + }))) + .compat(); sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) @@ -530,7 +531,6 @@ impl StateApi for State where let mut previous_version = version.clone(); let stream = stream - .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) .filter_map(move |_| { let info = client.info(); let version = client @@ -539,11 +539,12 @@ impl StateApi for State where .map_err(Into::into); if previous_version != version { previous_version = version.clone(); - Some(version) + future::ready(Some(Ok::<_, ()>(version))) } else { - None + future::ready(None) } - }); + }) + .compat(); sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) diff --git a/substrate/core/service/Cargo.toml b/substrate/core/service/Cargo.toml index 392373cc18..8be5002d2d 100644 --- a/substrate/core/service/Cargo.toml +++ b/substrate/core/service/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" futures = "0.1.17" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } parking_lot = "0.8.0" lazy_static = "1.0" log = "0.4" diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 2f305b5e72..60f93b2da2 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -30,6 +30,7 @@ use std::net::SocketAddr; use std::collections::HashMap; use std::time::Duration; use futures::sync::mpsc; +use futures03::{StreamExt as _, TryStreamExt as _}; use parking_lot::Mutex; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; @@ -294,6 +295,7 @@ impl Service { let to_spawn_tx_ = to_spawn_tx.clone(); let events = client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |notification| { let number = *notification.header.number(); @@ -623,8 +625,10 @@ fn build_network_future< const STATUS_INTERVAL: Duration = Duration::from_millis(5000); let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); - let mut imported_blocks_stream = client.import_notification_stream().fuse(); - let mut finality_notification_stream = client.finality_notification_stream().fuse(); + let mut imported_blocks_stream = client.import_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); + let mut finality_notification_stream = client.finality_notification_stream().fuse() + .map(|v| Ok::<_, ()>(v)).compat(); futures::future::poll_fn(move || { // We poll `imported_blocks_stream`. diff --git a/substrate/core/test-client/Cargo.toml b/substrate/core/test-client/Cargo.toml index 55d94adb9a..0d709fab68 100644 --- a/substrate/core/test-client/Cargo.toml +++ b/substrate/core/test-client/Cargo.toml @@ -9,7 +9,7 @@ client = { package = "substrate-client", path = "../client" } client-db = { package = "substrate-client-db", path = "../client/db", features = ["test-helpers"] } consensus = { package = "substrate-consensus-common", path = "../consensus/common" } executor = { package = "substrate-executor", path = "../executor" } -futures = { version = "0.1.27" } +futures-preview = "0.3.0-alpha.17" hash-db = "0.14.0" keyring = { package = "substrate-keyring", path = "../keyring" } parity-codec = "4.1.1" diff --git a/substrate/core/test-client/src/lib.rs b/substrate/core/test-client/src/lib.rs index 40fbd10d9e..509863e4e5 100644 --- a/substrate/core/test-client/src/lib.rs +++ b/substrate/core/test-client/src/lib.rs @@ -32,7 +32,7 @@ pub use state_machine::ExecutionStrategy; use std::sync::Arc; use std::collections::HashMap; -use futures::future::FutureResult; +use futures::future::Ready; use hash_db::Hasher; use primitives::storage::well_known_keys; use runtime_primitives::traits::{ @@ -220,11 +220,11 @@ impl TestClientBuilder< } impl client::light::fetcher::Fetcher for LightFetcher { - type RemoteHeaderResult = FutureResult; - type RemoteReadResult = FutureResult>, client::error::Error>; - type RemoteCallResult = FutureResult, client::error::Error>; - type RemoteChangesResult = FutureResult, u32)>, client::error::Error>; - type RemoteBodyResult = FutureResult, client::error::Error>; + type RemoteHeaderResult = Ready>; + type RemoteReadResult = Ready>, client::error::Error>>; + type RemoteCallResult = Ready, client::error::Error>>; + type RemoteChangesResult = Ready, u32)>, client::error::Error>>; + type RemoteBodyResult = Ready, client::error::Error>>; fn remote_header( &self,