diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 9a65cf8f4b..6fcd6982e0 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4621,14 +4621,13 @@ dependencies = [ name = "substrate-peerset" version = "2.0.0" dependencies = [ - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/substrate/core/network/src/custom_proto/behaviour.rs b/substrate/core/network/src/custom_proto/behaviour.rs index 727977326f..f6510c1a39 100644 --- a/substrate/core/network/src/custom_proto/behaviour.rs +++ b/substrate/core/network/src/custom_proto/behaviour.rs @@ -19,11 +19,12 @@ use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOu use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use fnv::FnvHashMap; use futures::prelude::*; +use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::core::{Multiaddr, PeerId}; use log::{debug, error, trace, warn}; use smallvec::SmallVec; -use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem}; +use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin}; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::clock::Clock; @@ -942,7 +943,10 @@ where // Poll for instructions from the peerset. // Note that the peerset is a *best effort* crate, and we have to use defensive programming. loop { - match self.peerset.poll() { + let mut peerset01 = futures03::stream::poll_fn(|cx| + futures03::Stream::poll_next(Pin::new(&mut self.peerset), cx) + ).map(|v| Ok::<_, ()>(v)).compat(); + match peerset01.poll() { Ok(Async::Ready(Some(peerset::Message::Accept(index)))) => { self.peerset_report_accept(index); } diff --git a/substrate/core/peerset/Cargo.toml b/substrate/core/peerset/Cargo.toml index 705e2f1447..91e9d58e0a 100644 --- a/substrate/core/peerset/Cargo.toml +++ b/substrate/core/peerset/Cargo.toml @@ -8,7 +8,7 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -futures = "0.1" +futures-preview = "0.3.0-alpha.17" libp2p = { version = "0.10.0", default-features = false } linked-hash-map = "0.5" log = "0.4" @@ -17,4 +17,3 @@ serde_json = "1.0.24" [dev-dependencies] rand = "0.6" -tokio = "0.1" diff --git a/substrate/core/peerset/src/lib.rs b/substrate/core/peerset/src/lib.rs index d95a098093..763c94d0d6 100644 --- a/substrate/core/peerset/src/lib.rs +++ b/substrate/core/peerset/src/lib.rs @@ -20,10 +20,11 @@ mod peersstate; use std::{collections::{HashSet, HashMap}, collections::VecDeque, time::Instant}; -use futures::{prelude::*, sync::mpsc, try_ready}; +use futures::{prelude::*, channel::mpsc, stream::Fuse}; use libp2p::PeerId; use log::{debug, error, trace}; use serde_json::json; +use std::{pin::Pin, task::Context, task::Poll}; /// We don't accept nodes whose reputation is under this value. const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100); @@ -155,7 +156,7 @@ pub struct Peerset { data: peersstate::PeersState, /// If true, we only accept reserved nodes. reserved_only: bool, - rx: mpsc::UnboundedReceiver, + rx: Fuse>, message_queue: VecDeque, /// When the `Peerset` was created. created: Instant, @@ -174,7 +175,7 @@ impl Peerset { let mut peerset = Peerset { data: peersstate::PeersState::new(config.in_peers, config.out_peers), - rx, + rx: rx.fuse(), reserved_only: config.reserved_only, message_queue: VecDeque::new(), created: Instant::now(), @@ -457,24 +458,34 @@ impl Peerset { impl Stream for Peerset { type Item = Message; - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { if let Some(message) = self.message_queue.pop_front() { - return Ok(Async::Ready(Some(message))); + return Poll::Ready(Some(message)); } - match try_ready!(self.rx.poll()) { - None => return Ok(Async::NotReady), - Some(action) => match action { - Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id), - Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id), - Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), - Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), - Action::SetPriorityGroup(group_id, peers) => self.on_set_priority_group(&group_id, peers), - Action::AddToPriorityGroup(group_id, peer_id) => self.on_add_to_priority_group(&group_id, peer_id), - Action::RemoveFromPriorityGroup(group_id, peer_id) => self.on_remove_from_priority_group(&group_id, peer_id), - } + + let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(event)) => event, + Poll::Ready(None) => return Poll::Pending, + }; + + match action { + Action::AddReservedPeer(peer_id) => + self.on_add_reserved_peer(peer_id), + Action::RemoveReservedPeer(peer_id) => + self.on_remove_reserved_peer(peer_id), + Action::SetReservedOnly(reserved) => + self.on_set_reserved_only(reserved), + Action::ReportPeer(peer_id, score_diff) => + self.on_report_peer(peer_id, score_diff), + Action::SetPriorityGroup(group_id, peers) => + self.on_set_priority_group(&group_id, peers), + Action::AddToPriorityGroup(group_id, peer_id) => + self.on_add_to_priority_group(&group_id, peer_id), + Action::RemoveFromPriorityGroup(group_id, peer_id) => + self.on_remove_from_priority_group(&group_id, peer_id), } } } @@ -485,7 +496,7 @@ mod tests { use libp2p::PeerId; use futures::prelude::*; use super::{PeersetConfig, Peerset, Message, IncomingIndex, BANNED_THRESHOLD}; - use std::{thread, time::Duration}; + use std::{pin::Pin, task::Poll, thread, time::Duration}; fn assert_messages(mut peerset: Peerset, messages: Vec) -> Peerset { for expected_message in messages { @@ -497,10 +508,8 @@ mod tests { peerset } - fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> { - let (next, peerset) = peerset.into_future() - .wait() - .map_err(|_| ())?; + fn next_message(mut peerset: Peerset) -> Result<(Message, Peerset), ()> { + let next = futures::executor::block_on_stream(&mut peerset).next(); let message = next.ok_or_else(|| ())?; Ok((message, peerset)) } @@ -598,13 +607,13 @@ mod tests { let peer_id = PeerId::random(); handle.report_peer(peer_id.clone(), BANNED_THRESHOLD - 1); - let fut = futures::future::poll_fn(move || -> Result<_, ()> { + let fut = futures::future::poll_fn(move |cx| { // We need one polling for the message to be processed. - assert_eq!(peerset.poll().unwrap(), Async::NotReady); + assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending); // Check that an incoming connection from that node gets refused. peerset.incoming(peer_id.clone(), IncomingIndex(1)); - if let Async::Ready(msg) = peerset.poll().unwrap() { + if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) { assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1))); } else { panic!() @@ -615,14 +624,14 @@ mod tests { // Try again. This time the node should be accepted. peerset.incoming(peer_id.clone(), IncomingIndex(2)); - while let Async::Ready(msg) = peerset.poll().unwrap() { + while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) { assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2))); } - Ok(Async::Ready(())) + Poll::Ready(()) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut).unwrap(); + futures::executor::block_on(fut); } } diff --git a/substrate/core/peerset/tests/fuzz.rs b/substrate/core/peerset/tests/fuzz.rs index 42a7f2770c..be29916c34 100644 --- a/substrate/core/peerset/tests/fuzz.rs +++ b/substrate/core/peerset/tests/fuzz.rs @@ -18,7 +18,7 @@ use futures::prelude::*; use libp2p::PeerId; use rand::distributions::{Distribution, Uniform, WeightedIndex}; use rand::seq::IteratorRandom; -use std::{collections::HashMap, collections::HashSet, iter}; +use std::{collections::HashMap, collections::HashSet, iter, pin::Pin, task::Poll}; use substrate_peerset::{IncomingIndex, Message, PeersetConfig, Peerset}; #[test] @@ -54,7 +54,7 @@ fn test_once() { out_peers: Uniform::new_inclusive(0, 25).sample(&mut rng), }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(futures::future::poll_fn(move || -> Result<_, ()> { + futures::executor::block_on(futures::future::poll_fn(move |cx| { // List of nodes the user of `peerset` assumes it's connected to. Always a subset of // `known_nodes`. let mut connected_nodes = HashSet::::new(); @@ -71,20 +71,20 @@ fn test_once() { let action_weights = [150, 90, 90, 30, 30, 1, 1, 4, 4]; match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) { // If we generate 0, poll the peerset. - 0 => match peerset.poll().unwrap() { - Async::Ready(Some(Message::Connect(id))) => { + 0 => match Stream::poll_next(Pin::new(&mut peerset), cx) { + Poll::Ready(Some(Message::Connect(id))) => { if let Some(id) = incoming_nodes.iter().find(|(_, v)| **v == id).map(|(&id, _)| id) { incoming_nodes.remove(&id); } assert!(connected_nodes.insert(id)); } - Async::Ready(Some(Message::Drop(id))) => { connected_nodes.remove(&id); } - Async::Ready(Some(Message::Accept(n))) => + Poll::Ready(Some(Message::Drop(id))) => { connected_nodes.remove(&id); } + Poll::Ready(Some(Message::Accept(n))) => assert!(connected_nodes.insert(incoming_nodes.remove(&n).unwrap())), - Async::Ready(Some(Message::Reject(n))) => + Poll::Ready(Some(Message::Reject(n))) => assert!(!connected_nodes.contains(&incoming_nodes.remove(&n).unwrap())), - Async::Ready(None) => panic!(), - Async::NotReady => {} + Poll::Ready(None) => panic!(), + Poll::Pending => {} } // If we generate 1, discover a new node. @@ -133,6 +133,6 @@ fn test_once() { } } - Ok(Async::Ready(())) - })).unwrap(); + Poll::Ready(()) + })); } diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 46a4e80d51..8da3c0c0f4 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -30,7 +30,6 @@ use std::net::SocketAddr; use std::collections::HashMap; use std::time::Duration; use futures::sync::mpsc; -use futures03::{StreamExt as _, TryStreamExt as _}; use parking_lot::Mutex; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT};