diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index a218c5638e..865dc1c2cf 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2255,6 +2255,7 @@ version = "2.0.0" dependencies = [ "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "node-executor 2.0.0", @@ -4190,6 +4191,7 @@ dependencies = [ name = "substrate-basic-authorship" version = "2.0.0" dependencies = [ + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", @@ -4310,6 +4312,7 @@ dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4331,7 +4334,6 @@ dependencies = [ "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4353,6 +4355,7 @@ dependencies = [ "fork-tree 2.0.0", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "merlin 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-bigint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4380,7 +4383,6 @@ dependencies = [ "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4401,7 +4403,8 @@ name = "substrate-consensus-common" version = "2.0.0" dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4412,8 +4415,6 @@ dependencies = [ "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", - "tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4444,7 +4445,8 @@ dependencies = [ name = "substrate-consensus-slots" version = "2.0.0" dependencies = [ - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4454,7 +4456,6 @@ dependencies = [ "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/substrate/core/basic-authorship/Cargo.toml b/substrate/core/basic-authorship/Cargo.toml index fa409f1b74..ca6e4f0b8a 100644 --- a/substrate/core/basic-authorship/Cargo.toml +++ b/substrate/core/basic-authorship/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] log = "0.4" +futures-preview = "0.3.0-alpha.17" codec = { package = "parity-codec", version = "4.1.1" } runtime_primitives = { package = "sr-primitives", path = "../../core/sr-primitives" } client = { package = "substrate-client", path = "../../core/client" } diff --git a/substrate/core/basic-authorship/src/basic_authorship.rs b/substrate/core/basic-authorship/src/basic_authorship.rs index 56a959ccbc..00408a1545 100644 --- a/substrate/core/basic-authorship/src/basic_authorship.rs +++ b/substrate/core/basic-authorship/src/basic_authorship.rs @@ -171,7 +171,7 @@ impl consensus_common::Proposer<::Block> for Pro A: txpool::ChainApi, client::error::Error: From<::Error> { - type Create = Result<::Block, error::Error>; + type Create = futures::future::Ready::Block, error::Error>>; type Error = error::Error; fn propose( @@ -179,11 +179,10 @@ impl consensus_common::Proposer<::Block> for Pro inherent_data: InherentData, inherent_digests: DigestFor, max_duration: time::Duration, - ) -> Result<::Block, error::Error> - { + ) -> Self::Create { // leave some time for evaluation and block finalization (33%) let deadline = (self.now)() + max_duration - max_duration / 3; - self.propose_with(inherent_data, inherent_digests, deadline) + futures::future::ready(self.propose_with(inherent_data, inherent_digests, deadline)) } } @@ -328,7 +327,8 @@ mod tests { cell.replace(new) }); let deadline = time::Duration::from_secs(3); - let block = proposer.propose(Default::default(), Default::default(), deadline).unwrap(); + let block = futures::executor::block_on(proposer.propose(Default::default(), Default::default(), deadline)) + .unwrap(); // then // block should have some extrinsics although we have some more in the pool. diff --git a/substrate/core/consensus/aura/Cargo.toml b/substrate/core/consensus/aura/Cargo.toml index 03ddf79be3..add7d8b217 100644 --- a/substrate/core/consensus/aura/Cargo.toml +++ b/substrate/core/consensus/aura/Cargo.toml @@ -19,13 +19,13 @@ client = { package = "substrate-client", path = "../../client" } substrate-telemetry = { path = "../../telemetry" } consensus_common = { package = "substrate-consensus-common", path = "../common" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } -futures = "0.1.17" -tokio-timer = "0.2.11" +futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures01 = { package = "futures", version = "0.1" } +futures-timer = "0.2.1" parking_lot = "0.8.0" log = "0.4" [dev-dependencies] -futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } keyring = { package = "substrate-keyring", path = "../../keyring" } substrate-executor = { path = "../../executor" } network = { package = "substrate-network", path = "../../network", features = ["test-helpers"]} diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 2bfd907646..964f7231b1 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -28,7 +28,7 @@ //! //! NOTE: Aura itself is designed to be generic over the crypto used. #![forbid(missing_docs, unsafe_code)] -use std::{sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug}; +use std::{sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug, pin::Pin}; use parity_codec::{Encode, Decode, Codec}; use consensus_common::{self, BlockImport, Environment, Proposer, @@ -52,9 +52,9 @@ use runtime_primitives::traits::{Block as BlockT, Header, DigestItemFor, Provide use primitives::Pair; use inherents::{InherentDataProviders, InherentData}; -use futures::{Future, IntoFuture, future}; +use futures::{prelude::*, future}; use parking_lot::Mutex; -use tokio_timer::Timeout; +use futures_timer::Delay; use log::{error, warn, debug, info, trace}; use srml_aura::{ @@ -128,7 +128,7 @@ impl SlotCompatible for AuraSlotCompatible { } } -/// Start the aura worker. The returned future should be run in a tokio runtime. +/// Start the aura worker. The returned future should be run in a futures executor. pub fn start_aura( slot_duration: SlotDuration, local_key: Arc

, @@ -139,13 +139,13 @@ pub fn start_aura( sync_oracle: SO, inherent_data_providers: InherentDataProviders, force_authoring: bool, -) -> Result, consensus_common::Error> where +) -> Result, consensus_common::Error> where B: BlockT, C: ProvideRuntimeApi + ProvideCache + AuxStore + Send + Sync, C::Api: AuraApi>, SC: SelectChain, E::Proposer: Proposer, - <>::Create as IntoFuture>::Future: Send + 'static, + >::Create: Unpin + Send + 'static, P: Pair + Send + Sync + 'static, P::Public: Hash + Member + Encode + Decode, P::Signature: Hash + Member + Encode + Decode, @@ -174,7 +174,7 @@ pub fn start_aura( sync_oracle, inherent_data_providers, AuraSlotCompatible, - )) + ).map(|()| Ok::<(), ()>(())).compat()) } struct AuraWorker { @@ -192,7 +192,7 @@ impl SlotWorker for AuraWorker w C::Api: AuraApi>, E: Environment, E::Proposer: Proposer, - <>::Create as IntoFuture>::Future: Send + 'static, + >::Create: Unpin + Send + 'static, H: Header, I: BlockImport + Send + Sync + 'static, P: Pair + Send + Sync + 'static, @@ -201,7 +201,7 @@ impl SlotWorker for AuraWorker w SO: SyncOracle + Send + Clone, Error: ::std::error::Error + Send + From<::consensus_common::Error> + From + 'static, { - type OnSlot = Box + Send>; + type OnSlot = Pin> + Send>>; fn on_slot( &self, @@ -228,7 +228,7 @@ impl SlotWorker for AuraWorker w telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities"; "slot" => ?chain_head.hash(), "err" => ?e ); - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); } }; @@ -237,11 +237,11 @@ impl SlotWorker for AuraWorker w telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot"; "authorities_len" => authorities.len() ); - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); } let maybe_author = slot_author::

(slot_num, &authorities); let proposal_work = match maybe_author { - None => return Box::new(future::ok(())), + None => return Box::pin(future::ready(Ok(()))), Some(author) => if author == &public_key { debug!( target: "aura", "Starting authorship at slot {}; timestamp = {}", @@ -260,14 +260,14 @@ impl SlotWorker for AuraWorker w telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block"; "slot" => slot_num, "err" => ?e ); - return Box::new(future::ok(())) + return Box::pin(future::ready(Ok(()))) } }; let remaining_duration = slot_info.remaining_duration(); // deadline our production to approx. the end of the // slot - Timeout::new( + futures::future::select( proposer.propose( slot_info.inherent_data, generic::Digest { @@ -276,15 +276,21 @@ impl SlotWorker for AuraWorker w ], }, remaining_duration, - ).into_future(), - remaining_duration, - ) + ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), + Delay::new(remaining_duration) + .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) + ).map(|v| match v { + futures::future::Either::Left((v, _)) => v, + futures::future::Either::Right((Ok(_), _)) => + Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())), + futures::future::Either::Right((Err(err), _)) => Err(err), + }) } else { - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); } }; - Box::new(proposal_work.map(move |b| { + Box::pin(proposal_work.map_ok(move |b| { // minor hack since we don't have access to the timestamp // that is actually set by the proposer. let slot_after_building = SignedDuration::default().slot_now(slot_duration); @@ -346,7 +352,7 @@ impl SlotWorker for AuraWorker w "hash" => ?parent_hash, "err" => ?e ); } - }).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into())) + })) } } @@ -708,8 +714,6 @@ pub fn import_queue( #[cfg(test)] mod tests { use super::*; - use futures::{Async, stream::Stream as _}; - use futures03::{StreamExt as _, TryStreamExt as _}; use consensus_common::NoNetwork as DummyOracle; use network::test::*; use network::test::{Block as TestBlock, PeersClient, PeersFullClient}; @@ -747,15 +751,16 @@ mod tests { impl Proposer for DummyProposer { type Error = Error; - type Create = Result; + type Create = future::Ready>; fn propose( &self, _: InherentData, digests: DigestFor, _: Duration, - ) -> Result { - self.1.new_block(digests).unwrap().bake().map_err(|e| e.into()) + ) -> Self::Create { + let r = self.1.new_block(digests).unwrap().bake().map_err(|e| e.into()); + future::ready(r) } } @@ -839,9 +844,8 @@ mod tests { let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) - .for_each(move |_| Ok(())) + .take_while(|n| future::ready(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) + .for_each(move |_| future::ready(())) ); let slot_duration = SlotDuration::get_or_compute(&*client) @@ -867,13 +871,13 @@ mod tests { runtime.spawn(aura); } - // wait for all finalized on each. - let wait_for = ::futures::future::join_all(import_notifications) - .map(|_| ()) - .map_err(|_| ()); + runtime.spawn(futures01::future::poll_fn(move || { + net.lock().poll(); + Ok::<_, ()>(futures01::Async::NotReady::<()>) + })); - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + runtime.block_on(future::join_all(import_notifications) + .map(|_| Ok::<(), ()>(())).compat()).unwrap(); } #[test] diff --git a/substrate/core/consensus/babe/Cargo.toml b/substrate/core/consensus/babe/Cargo.toml index 516ac1a40d..deb716a7ff 100644 --- a/substrate/core/consensus/babe/Cargo.toml +++ b/substrate/core/consensus/babe/Cargo.toml @@ -23,9 +23,9 @@ consensus_common = { package = "substrate-consensus-common", path = "../common" slots = { package = "substrate-consensus-slots", path = "../slots" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } fork-tree = { path = "../../utils/fork-tree" } -futures = "0.1.26" -futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } -tokio-timer = "0.2.11" +futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures01 = { package = "futures", version = "0.1" } +futures-timer = "0.2.1" parking_lot = "0.8.0" log = "0.4.6" schnorrkel = "0.1.1" diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs index 4068ff633a..38b6257c8a 100644 --- a/substrate/core/consensus/babe/src/lib.rs +++ b/substrate/core/consensus/babe/src/lib.rs @@ -32,7 +32,7 @@ use runtime_primitives::traits::{ Block as BlockT, Header, DigestItemFor, NumberFor, ProvideRuntimeApi, SimpleBitOps, Zero, }; -use std::{collections::HashMap, sync::Arc, u64, fmt::{Debug, Display}, time::{Instant, Duration}}; +use std::{collections::HashMap, sync::Arc, u64, fmt::{Debug, Display}, pin::Pin, time::{Instant, Duration}}; use runtime_support::serde::{Serialize, Deserialize}; use parity_codec::{Decode, Encode}; use parking_lot::{Mutex, MutexGuard}; @@ -74,9 +74,9 @@ use client::{ }; use fork_tree::ForkTree; use slots::{CheckedHeader, check_equivocation}; -use futures::{Future, IntoFuture, future, stream::Stream}; -use futures03::{StreamExt as _, TryStreamExt as _}; -use tokio_timer::Timeout; +use futures::{prelude::*, future}; +use futures01::Stream as _; +use futures_timer::Delay; use log::{error, warn, debug, info, trace}; use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, SignedDuration}; @@ -182,7 +182,7 @@ pub fn start_babe(BabeParams { force_authoring, time_source, }: BabeParams) -> Result< - impl Future, + impl futures01::Future, consensus_common::Error, > where B: BlockT, @@ -190,7 +190,7 @@ pub fn start_babe(BabeParams { C::Api: BabeApi, SC: SelectChain, E::Proposer: Proposer, - <>::Create as IntoFuture>::Future: Send + 'static, + >::Create: Unpin + Send + 'static, H: Header, E: Environment, I: BlockImport + Send + Sync + 'static, @@ -214,7 +214,7 @@ pub fn start_babe(BabeParams { sync_oracle, inherent_data_providers, time_source, - )) + ).map(|()| Ok::<(), ()>(())).compat()) } struct BabeWorker { @@ -233,7 +233,7 @@ impl SlotWorker for BabeWorker w C::Api: BabeApi, E: Environment, E::Proposer: Proposer, - <>::Create as IntoFuture>::Future: Send + 'static, + >::Create: Unpin + Send + 'static, Hash: Debug + Eq + Copy + SimpleBitOps + Encode + Decode + Serialize + for<'de> Deserialize<'de> + Debug + Default + AsRef<[u8]> + AsMut<[u8]> + std::hash::Hash + Display + Send + Sync + 'static, @@ -242,7 +242,7 @@ impl SlotWorker for BabeWorker w SO: SyncOracle + Send + Clone, Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, { - type OnSlot = Box + Send>; + type OnSlot = Pin> + Send>>; fn on_slot( &self, @@ -269,7 +269,7 @@ impl SlotWorker for BabeWorker w telemetry!(CONSENSUS_WARN; "babe.unable_fetching_authorities"; "slot" => ?chain_head.hash(), "err" => ?e ); - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); } }; @@ -284,7 +284,7 @@ impl SlotWorker for BabeWorker w telemetry!(CONSENSUS_DEBUG; "babe.skipping_proposal_slot"; "authorities_len" => authorities.len() ); - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); } let proposal_work = if let Some(claim) = claim_slot( @@ -316,7 +316,7 @@ impl SlotWorker for BabeWorker w telemetry!(CONSENSUS_WARN; "babe.unable_authoring_block"; "slot" => slot_number, "err" => ?e ); - return Box::new(future::ok(())) + return Box::pin(future::ready(Ok(()))) } }; @@ -329,7 +329,7 @@ impl SlotWorker for BabeWorker w // deadline our production to approx. the end of the slot let remaining_duration = slot_info.remaining_duration(); - Timeout::new( + futures::future::select( proposer.propose( slot_info.inherent_data, generic::Digest { @@ -338,14 +338,20 @@ impl SlotWorker for BabeWorker w ], }, remaining_duration, - ).into_future(), - remaining_duration, - ) + ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), + Delay::new(remaining_duration) + .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) + ).map(|v| match v { + futures::future::Either::Left((v, _)) => v, + futures::future::Either::Right((Ok(_), _)) => + Err(consensus_common::Error::ClientImport("Timeout in the BaBe proposer".into())), + futures::future::Either::Right((Err(err), _)) => Err(err), + }) } else { - return Box::new(future::ok(())); + return Box::pin(future::ready(Ok(()))); }; - Box::new(proposal_work.map(move |b| { + Box::pin(proposal_work.map_ok(move |b| { // minor hack since we don't have access to the timestamp // that is actually set by the proposer. let slot_after_building = SignedDuration::default().slot_now(slot_duration); @@ -402,9 +408,6 @@ impl SlotWorker for BabeWorker w "hash" => ?parent_hash, "err" => ?e ); } - }).map_err(|e| { - warn!("Client import failed: {:?}", e); - consensus_common::Error::ClientImport(format!("{:?}", e)).into() })) } } @@ -1137,7 +1140,7 @@ pub fn import_queue, I, RA, PRA>( BabeImportQueue, BabeLink, BabeBlockImport, - impl Future, + impl futures01::Future, )> where B: Backend + 'static, I: BlockImport + Clone + Send + Sync + 'static, diff --git a/substrate/core/consensus/babe/src/tests.rs b/substrate/core/consensus/babe/src/tests.rs index 571951b3b7..1a68c35dbe 100644 --- a/substrate/core/consensus/babe/src/tests.rs +++ b/substrate/core/consensus/babe/src/tests.rs @@ -32,7 +32,6 @@ use keyring::sr25519::Keyring; use super::generic::DigestItem; use client::BlockchainEvents; use test_client; -use futures::Async; use log::debug; use std::{time::Duration, borrow::Borrow, cell::RefCell}; type Item = generic::DigestItem; @@ -62,15 +61,15 @@ impl Environment for DummyFactory { impl Proposer for DummyProposer { type Error = Error; - type Create = Result; + type Create = future::Ready>; fn propose( &self, _: InherentData, digests: DigestFor, _: Duration, - ) -> Result { - self.1.new_block(digests).unwrap().bake().map_err(|e| e.into()) + ) -> Self::Create { + future::ready(self.1.new_block(digests).unwrap().bake().map_err(|e| e.into())) } } @@ -203,9 +202,8 @@ fn run_one_test() { let environ = Arc::new(DummyFactory(client.clone())); import_notifications.push( client.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &5)) - .for_each(move |_| Ok(())) + .take_while(|n| future::ready(!(n.origin != BlockOrigin::Own && n.header.number() < &5))) + .for_each(move |_| future::ready(())) ); let config = Config::get_or_compute(&*client) @@ -234,14 +232,13 @@ fn run_one_test() { }).expect("Starts babe")); } - // wait for all finalized on each. - let wait_for = futures::future::join_all(import_notifications); - - let drive_to_completion = futures::future::poll_fn(|| { + runtime.spawn(futures01::future::poll_fn(move || { net.lock().poll(); - Ok(Async::NotReady) - }); - let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + Ok::<_, ()>(futures01::Async::NotReady::<()>) + })); + + runtime.block_on(future::join_all(import_notifications) + .map(|_| Ok::<(), ()>(())).compat()).unwrap(); } #[test] diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index c44b34baeb..bb0970f8a2 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -11,12 +11,11 @@ libp2p = { version = "0.11.0", default-features = false } log = "0.4" primitives = { package = "substrate-primitives", path= "../../primitives" } inherents = { package = "substrate-inherents", path = "../../inherents" } -futures = "0.1" +futures-preview = "0.3.0-alpha.17" +futures-timer = "0.2.1" rstd = { package = "sr-std", path = "../../sr-std" } runtime_version = { package = "sr-version", path = "../../sr-version" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } -tokio-executor = "0.1.6" -tokio-timer = "0.2" parity-codec = { version = "4.1.1", features = ["derive"] } parking_lot = "0.8.0" diff --git a/substrate/core/consensus/common/src/error.rs b/substrate/core/consensus/common/src/error.rs index d8683d0b68..cb57bb915e 100644 --- a/substrate/core/consensus/common/src/error.rs +++ b/substrate/core/consensus/common/src/error.rs @@ -33,7 +33,7 @@ pub enum Error { IoTerminated, /// Unable to schedule wakeup. #[display(fmt="Timer error: {}", _0)] - FaultyTimer(tokio_timer::Error), + FaultyTimer(std::io::Error), /// Error while working with inherent data. #[display(fmt="InherentData error: {}", _0)] InherentData(String), diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 5e7969cc02..085f1e4177 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -106,8 +106,8 @@ pub trait ImportQueue: Send { /// /// This method should behave in a way similar to `Future::poll`. It can register the current /// task and notify later when more actions are ready to be polled. To continue the comparison, - /// it is as if this method always returned `Ok(Async::NotReady)`. - fn poll_actions(&mut self, link: &mut dyn Link); + /// it is as if this method always returned `Poll::Pending`. + fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link); } /// Hooks that the verification queue can use to influence the synchronization diff --git a/substrate/core/consensus/common/src/import_queue/basic_queue.rs b/substrate/core/consensus/common/src/import_queue/basic_queue.rs index 51d30cddbb..082006ae5d 100644 --- a/substrate/core/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/core/consensus/common/src/import_queue/basic_queue.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::sync::Arc; -use futures::{prelude::*, future::Executor, sync::mpsc}; +use std::{pin::Pin, sync::Arc}; +use futures::{prelude::*, channel::mpsc, task::SpawnExt as _, task::Context, task::Poll}; use runtime_primitives::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; use crate::error::Error as ConsensusError; @@ -34,15 +34,12 @@ pub struct BasicQueue { sender: mpsc::UnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, - /// Since we have to be in a tokio context in order to spawn background tasks, we first store - /// the task to spawn here, then extract it as soon as we are in a tokio context. - /// If `Some`, contains the task to spawn in the background. If `None`, the future has already - /// been spawned. - future_to_spawn: Option + Send>>, /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from /// `poll_actions`. - manual_poll: Option + Send>>, + manual_poll: Option + Send>>>, + /// A thread pool where the background worker is being run. + pool: Option, } impl BasicQueue { @@ -65,11 +62,27 @@ impl BasicQueue { finality_proof_import, ); + let mut pool = futures::executor::ThreadPool::builder() + .name_prefix("import-queue-worker-") + .pool_size(1) + .create() + .ok(); + + let manual_poll; + if let Some(pool) = &mut pool { + // TODO: this expect() can be removed once + // https://github.com/rust-lang-nursery/futures-rs/pull/1750 is merged and deployed + pool.spawn(future).expect("ThreadPool can never fail to spawn tasks; QED"); + manual_poll = None; + } else { + manual_poll = Some(Box::pin(future) as Pin>); + } + Self { sender: worker_sender, result_port, - future_to_spawn: Some(Box::new(future)), - manual_poll: None, + manual_poll, + pool, } } } @@ -99,25 +112,17 @@ impl ImportQueue for BasicQueue { let _ = self.sender.unbounded_send(ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)); } - fn poll_actions(&mut self, link: &mut dyn Link) { - // Try to spawn the future in `future_to_spawn`. - if let Some(future) = self.future_to_spawn.take() { - if let Err(err) = tokio_executor::DefaultExecutor::current().execute(future) { - debug_assert!(self.manual_poll.is_none()); - self.manual_poll = Some(err.into_future()); - } - } - + fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll // manually here. if let Some(manual_poll) = self.manual_poll.as_mut() { - match manual_poll.poll() { - Ok(Async::NotReady) => {} + match Future::poll(Pin::new(manual_poll), cx) { + Poll::Pending => {} _ => self.manual_poll = None, } } - self.result_port.poll_actions(link); + self.result_port.poll_actions(cx, link); } } @@ -144,7 +149,7 @@ impl> BlockImportWorker { block_import: BoxBlockImport, justification_import: Option>, finality_proof_import: Option>, - ) -> (impl Future + Send, mpsc::UnboundedSender>) { + ) -> (impl Future + Send, mpsc::UnboundedSender>) { let (sender, mut port) = mpsc::unbounded(); let mut worker = BlockImportWorker { @@ -167,12 +172,12 @@ impl> BlockImportWorker { } } - let future = futures::future::poll_fn(move || { + let future = futures::future::poll_fn(move |cx| { loop { - let msg = match port.poll() { - Ok(Async::Ready(Some(msg))) => msg, - Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => return Ok(Async::NotReady), + let msg = match Stream::poll_next(Pin::new(&mut port), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, }; match msg { diff --git a/substrate/core/consensus/common/src/import_queue/buffered_link.rs b/substrate/core/consensus/common/src/import_queue/buffered_link.rs index 9c555ba9d9..ffd08e690a 100644 --- a/substrate/core/consensus/common/src/import_queue/buffered_link.rs +++ b/substrate/core/consensus/common/src/import_queue/buffered_link.rs @@ -20,7 +20,7 @@ //! //! # Example //! -//! ```no_run +//! ``` //! use substrate_consensus_common::import_queue::Link; //! # use substrate_consensus_common::import_queue::buffered_link::buffered_link; //! # use test_client::runtime::Block; @@ -28,12 +28,18 @@ //! # let mut my_link = DummyLink; //! let (mut tx, mut rx) = buffered_link::(); //! tx.blocks_processed(0, 0, vec![]); -//! rx.poll_actions(&mut my_link); // Calls `my_link.blocks_processed(0, 0, vec![])` +//! +//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled. +//! let _fut = futures::future::poll_fn(move |cx| { +//! rx.poll_actions(cx, &mut my_link); +//! std::task::Poll::Pending::<()> +//! }); //! ``` //! -use futures::{prelude::*, sync::mpsc}; +use futures::{prelude::*, channel::mpsc}; use runtime_primitives::traits::{Block as BlockT, NumberFor}; +use std::{pin::Pin, task::Context, task::Poll}; use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError}; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and @@ -120,10 +126,10 @@ impl BufferedLinkReceiver { /// /// This method should behave in a way similar to `Future::poll`. It can register the current /// task and notify later when more actions are ready to be polled. To continue the comparison, - /// it is as if this method always returned `Ok(Async::NotReady)`. - pub fn poll_actions(&mut self, link: &mut dyn Link) { + /// it is as if this method always returned `Poll::Pending`. + pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { loop { - let msg = if let Ok(Async::Ready(Some(msg))) = self.rx.poll() { + let msg = if let Poll::Ready(Some(msg)) = Stream::poll_next(Pin::new(&mut self.rx), cx) { msg } else { break diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 0c968a327d..d901610df9 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -75,7 +75,7 @@ pub trait Proposer { /// Error type which can occur when proposing or evaluating. type Error: From + ::std::fmt::Debug + 'static; /// Future that resolves to a committed proposal. - type Create: IntoFuture; + type Create: Future>; /// Create a proposal. fn propose( &self, diff --git a/substrate/core/consensus/slots/Cargo.toml b/substrate/core/consensus/slots/Cargo.toml index fa856bbfbb..29c11f7533 100644 --- a/substrate/core/consensus/slots/Cargo.toml +++ b/substrate/core/consensus/slots/Cargo.toml @@ -12,8 +12,8 @@ primitives = { package = "substrate-primitives", path = "../../primitives" } runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } consensus_common = { package = "substrate-consensus-common", path = "../common" } inherents = { package = "substrate-inherents", path = "../../inherents" } -futures = "0.1.17" -tokio-timer = "0.2.11" +futures-preview = "0.3.0-alpha.17" +futures-timer = "0.2.1" parking_lot = "0.8.0" log = "0.4" diff --git a/substrate/core/consensus/slots/src/lib.rs b/substrate/core/consensus/slots/src/lib.rs index 816a61babd..7e39d49651 100644 --- a/substrate/core/consensus/slots/src/lib.rs +++ b/substrate/core/consensus/slots/src/lib.rs @@ -32,23 +32,18 @@ pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; use codec::{Decode, Encode}; use consensus_common::{SyncOracle, SelectChain}; -use futures::prelude::*; -use futures::{ - future::{self, Either}, - Future, IntoFuture, -}; +use futures::{prelude::*, future::{self, Either}, task::Poll}; use inherents::{InherentData, InherentDataProviders}; use log::{debug, error, info, warn}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{ApiRef, Block as BlockT, ProvideRuntimeApi}; -use std::fmt::Debug; -use std::ops::Deref; +use std::{fmt::Debug, ops::Deref, panic, pin::Pin}; /// A worker that should be invoked at every new slot. pub trait SlotWorker { /// The type of the future that will be returned when a new slot is /// triggered. - type OnSlot: IntoFuture; + type OnSlot: Future>; /// Called when a new slot is triggered. fn on_slot(&self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot; @@ -78,13 +73,14 @@ pub fn start_slot_worker( sync_oracle: SO, inherent_data_providers: InherentDataProviders, timestamp_extractor: SC, -) -> impl Future +) -> impl Future where B: BlockT, C: SelectChain + Clone, W: SlotWorker, + W::OnSlot: Unpin, SO: SyncOracle + Send + Clone, - SC: SlotCompatible, + SC: SlotCompatible + Unpin, T: SlotData + Clone, { let SlotDuration(slot_duration) = slot_duration; @@ -94,12 +90,12 @@ where slot_duration.slot_duration(), inherent_data_providers, timestamp_extractor, - ).map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e)) - .for_each(move |slot_info| { + ).inspect_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e)) + .try_for_each(move |slot_info| { // only propose when we are not syncing. if sync_oracle.is_major_syncing() { debug!(target: "slots", "Skipping proposal slot due to sync."); - return Either::B(future::ok(())); + return Either::Right(future::ready(Ok(()))); } let slot_num = slot_info.number; @@ -108,23 +104,23 @@ where Err(e) => { warn!(target: "slots", "Unable to author block in slot {}. \ no best block header: {:?}", slot_num, e); - return Either::B(future::ok(())); + return Either::Right(future::ready(Ok(()))); } }; - Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err( - |e| warn!(target: "slots", "Encountered consensus error: {:?}", e), + Either::Left(worker.on_slot(chain_head, slot_info).map_err( + |e| { warn!(target: "slots", "Encountered consensus error: {:?}", e); e } )) }); - future::poll_fn(move || + future::poll_fn(move |cx| { loop { - let mut authorship = std::panic::AssertUnwindSafe(&mut authorship); - match std::panic::catch_unwind(move || authorship.poll()) { - Ok(Ok(Async::Ready(()))) => + match panic::catch_unwind(panic::AssertUnwindSafe(|| Future::poll(Pin::new(&mut authorship), cx))) { + Ok(Poll::Ready(Ok(()))) => warn!(target: "slots", "Slots stream has terminated unexpectedly."), - Ok(Ok(Async::NotReady)) => break Ok(Async::NotReady), - Ok(Err(())) => warn!(target: "slots", "Authorship task terminated unexpectedly. Restarting"), + Ok(Poll::Pending) => break Poll::Pending, + Ok(Poll::Ready(Err(_err))) => + warn!(target: "slots", "Authorship task terminated unexpectedly. Restarting"), Err(e) => { if let Some(s) = e.downcast_ref::<&'static str>() { warn!(target: "slots", "Authorship task panicked at {:?}", s); @@ -134,7 +130,7 @@ where } } } - ) + }) } /// A header which has been checked diff --git a/substrate/core/consensus/slots/src/slots.rs b/substrate/core/consensus/slots/src/slots.rs index 0142f32b82..4e51cf0d84 100644 --- a/substrate/core/consensus/slots/src/slots.rs +++ b/substrate/core/consensus/slots/src/slots.rs @@ -16,16 +16,15 @@ //! Utility stream for yielding slots in a loop. //! -//! This is used instead of `tokio_timer::Interval` because it was unreliable. +//! This is used instead of `futures_timer::Interval` because it was unreliable. use super::SlotCompatible; use consensus_common::Error; -use futures::prelude::*; -use futures::try_ready; +use futures::{prelude::*, task::Context, task::Poll}; use inherents::{InherentData, InherentDataProviders}; -use std::time::{Duration, Instant}; -use tokio_timer::Delay; +use std::{pin::Pin, time::{Duration, Instant}}; +use futures_timer::Delay; /// Returns current duration since unix epoch. pub fn duration_now() -> Duration { @@ -121,47 +120,51 @@ impl Slots { } } -impl Stream for Slots { - type Item = SlotInfo; - type Error = Error; +impl Stream for Slots { + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { let slot_duration = self.slot_duration; self.inner_delay = match self.inner_delay.take() { None => { // schedule wait. - let wait_until = Instant::now() + time_until_next(duration_now(), slot_duration); - Some(Delay::new(wait_until)) + let wait_dur = time_until_next(duration_now(), slot_duration); + Some(Delay::new(wait_dur)) } Some(d) => Some(d), }; if let Some(ref mut inner_delay) = self.inner_delay { - try_ready!(inner_delay - .poll() - .map_err(Error::FaultyTimer)); + match Future::poll(Pin::new(inner_delay), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(Error::FaultyTimer(err)))), + Poll::Ready(Ok(())) => {} + } } // timeout has fired. - let inherent_data = self - .inherent_data_providers - .create_inherent_data() - .map_err(|s| consensus_common::Error::InherentData(s.into_owned()))?; - let (timestamp, slot_num, offset) = self - .timestamp_extractor - .extract_timestamp_and_slot(&inherent_data)?; + let inherent_data = match self.inherent_data_providers.create_inherent_data() { + Ok(id) => id, + Err(err) => return Poll::Ready(Some(Err(consensus_common::Error::InherentData(err.into_owned())))), + }; + let result = self.timestamp_extractor.extract_timestamp_and_slot(&inherent_data); + let (timestamp, slot_num, offset) = match result { + Ok(v) => v, + Err(err) => return Poll::Ready(Some(Err(err))), + }; // reschedule delay for next slot. - let ends_at = Instant::now() + offset + + let ends_in = offset + time_until_next(Duration::from_secs(timestamp), slot_duration); - self.inner_delay = Some(Delay::new(ends_at)); + let ends_at = Instant::now() + ends_in; + self.inner_delay = Some(Delay::new(ends_in)); // never yield the same slot twice. if slot_num > self.last_slot { self.last_slot = slot_num; - break Ok(Async::Ready(Some(SlotInfo { + break Poll::Ready(Some(Ok(SlotInfo { number: slot_num, duration: self.slot_duration, timestamp, diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index c8ae6681de..5b4c880c89 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -31,6 +31,7 @@ use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use consensus::import_queue::{ImportQueue, Link}; use consensus::import_queue::{BlockImportResult, BlockImportError}; use futures::{prelude::*, sync::mpsc}; +use futures03::TryFutureExt as _; use log::{warn, error, info}; use libp2p::{PeerId, Multiaddr, multihash::Multihash}; use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; @@ -583,9 +584,12 @@ impl, H: ExHashT> Future for Ne fn poll(&mut self) -> Poll { // Poll the import queue for actions to perform. - self.import_queue.poll_actions(&mut NetworkLink { - protocol: &mut self.network_service, - }); + let _ = futures03::future::poll_fn(|cx| { + self.import_queue.poll_actions(cx, &mut NetworkLink { + protocol: &mut self.network_service, + }); + std::task::Poll::Pending::> + }).compat().poll(); // Check for new incoming on-demand requests. if let Some(on_demand_in) = self.on_demand_in.as_mut() { diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index dd2d26f8a3..6e7151ee0a 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -18,6 +18,7 @@ use std::{self, io::{Read, Write}}; use futures::prelude::*; +use futures03::TryFutureExt as _; use log::{info, warn}; use runtime_primitives::generic::{SignedBlock, BlockId}; @@ -193,7 +194,10 @@ pub fn import_blocks( } let blocks_before = link.imported_blocks; - queue.poll_actions(&mut link); + let _ = futures03::future::poll_fn(|cx| { + queue.poll_actions(cx, &mut link); + std::task::Poll::Pending::> + }).compat().poll(); if link.imported_blocks / 1000 != blocks_before / 1000 { info!( "#{} blocks were imported (#{} left)", diff --git a/substrate/node/cli/Cargo.toml b/substrate/node/cli/Cargo.toml index a9f3efdaa5..d3d48ebe01 100644 --- a/substrate/node/cli/Cargo.toml +++ b/substrate/node/cli/Cargo.toml @@ -47,6 +47,7 @@ balances = { package = "srml-balances", path = "../../srml/balances" } babe = { package = "substrate-consensus-babe", path = "../../core/consensus/babe", features = ["test-helpers"] } consensus-common = { package = "substrate-consensus-common", path = "../../core/consensus/common" } service-test = { package = "substrate-service-test", path = "../../core/service/test" } +futures03 = { package = "futures-preview", version = "0.3.0-alpha.17" } [build-dependencies] cli = { package = "substrate-cli", path = "../../core/cli" } diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index f75e97a8fe..8938d36ddf 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -381,11 +381,11 @@ mod tests { digest.push(::babe_pre_digest(babe_pre_digest)); let proposer = proposer_factory.init(&parent_header).unwrap(); - let new_block = proposer.propose( + let new_block = futures03::executor::block_on(proposer.propose( inherent_data, digest, std::time::Duration::from_secs(1), - ).expect("Error making test block"); + )).expect("Error making test block"); let (new_header, new_body) = new_block.deconstruct(); let pre_hash = new_header.hash();