From 1e77717b26dc5ee74d7f225db27f32631075e0d3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 19 Jul 2019 00:52:19 +0200 Subject: [PATCH] Switch the network timers to new futures (#3117) --- substrate/Cargo.lock | 2 +- substrate/core/network/Cargo.toml | 2 +- .../network/src/custom_proto/behaviour.rs | 73 +++++++++++-------- .../core/network/src/custom_proto/handler.rs | 24 +++--- substrate/core/network/src/debug_info.rs | 7 +- substrate/core/network/src/discovery.rs | 13 ++-- substrate/core/network/src/protocol.rs | 9 ++- substrate/core/network/src/test/sync.rs | 7 +- 8 files changed, 71 insertions(+), 66 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 4a985f432f..93cb8a6f23 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4519,6 +4519,7 @@ dependencies = [ "fork-tree 2.0.0", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4546,7 +4547,6 @@ dependencies = [ "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "zeroize 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 8b34317470..a94a3a01ba 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -16,6 +16,7 @@ bitflags = "1.0" fnv = "1.0" futures = "0.1.17" futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] } +futures-timer = "0.2.1" linked-hash-map = "0.5" linked_hash_set = "0.1.3" lru-cache = "0.1.1" @@ -35,7 +36,6 @@ slog = { version = "^2", features = ["nested-values"] } slog_derive = "0.1.1" smallvec = "0.6" tokio-io = "0.1" -tokio-timer = "0.2.11" tokio = { version = "0.1.11", optional = true } unsigned-varint = { version = "0.2.1", features = ["codec"] } keyring = { package = "substrate-keyring", path = "../../core/keyring", optional = true } diff --git a/substrate/core/network/src/custom_proto/behaviour.rs b/substrate/core/network/src/custom_proto/behaviour.rs index f6510c1a39..e54f5910e3 100644 --- a/substrate/core/network/src/custom_proto/behaviour.rs +++ b/substrate/core/network/src/custom_proto/behaviour.rs @@ -19,7 +19,7 @@ use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOu use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use fnv::FnvHashMap; use futures::prelude::*; -use futures03::{StreamExt as _, TryStreamExt as _}; +use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::core::{Multiaddr, PeerId}; use log::{debug, error, trace, warn}; @@ -27,7 +27,6 @@ use smallvec::SmallVec; use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin}; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::clock::Clock; /// Network behaviour that handles opening substreams for custom protocols with other nodes. /// @@ -82,9 +81,6 @@ pub struct CustomProto { /// Marker to pin the generics. marker: PhantomData, - - /// `Clock` instance that uses the current execution context's source of time. - clock: Clock, } /// State of a peer we're connected to. @@ -105,7 +101,9 @@ enum PeerState { /// The peerset requested that we connect to this peer. We are not connected to this node. PendingRequest { /// When to actually start dialing. - timer: tokio_timer::Delay, + timer: Compat, + /// When the `timer` will trigger. + timer_deadline: Instant, }, /// The peerset requested that we connect to this peer. We are currently dialing this peer. @@ -135,7 +133,9 @@ enum PeerState { /// state mismatch. open: bool, /// When to enable this remote. - timer: tokio_timer::Delay, + timer: Compat, + /// When the `timer` will trigger. + timer_deadline: Instant, }, /// We are connected to this peer and the peerset has accepted it. The handler is in the @@ -240,7 +240,6 @@ impl CustomProto { next_incoming_index: peerset::IncomingIndex(0), events: SmallVec::new(), marker: PhantomData, - clock: Clock::new(), } } @@ -277,13 +276,13 @@ impl CustomProto { st @ PeerState::Banned { .. } => *entry.into_mut() = st, // DisabledPendingEnable => Disabled. - PeerState::DisabledPendingEnable { open, connected_point, timer } => { + PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); let banned_until = Some(if let Some(ban) = ban { - cmp::max(timer.deadline(), self.clock.now() + ban) + cmp::max(timer_deadline, Instant::now() + ban) } else { - timer.deadline() + timer_deadline }); *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } }, @@ -297,8 +296,7 @@ impl CustomProto { peer_id: peer_id.clone(), event: CustomProtoHandlerIn::Disable, }); - let clock = &self.clock; - let banned_until = ban.map(|dur| clock.now() + dur); + let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } }, @@ -319,8 +317,7 @@ impl CustomProto { peer_id: peer_id.clone(), event: CustomProtoHandlerIn::Disable, }); - let clock = &self.clock; - let banned_until = ban.map(|dur| clock.now() + dur); + let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until } }, @@ -385,11 +382,12 @@ impl CustomProto { }; match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) { - PeerState::Banned { ref until } if *until > self.clock.now() => { + PeerState::Banned { ref until } if *until > Instant::now() => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \ until {:?}", occ_entry.key(), until); *occ_entry.into_mut() = PeerState::PendingRequest { - timer: tokio_timer::Delay::new(until.clone()), + timer: futures_timer::Delay::new_at(until.clone()).compat(), + timer_deadline: until.clone(), }; }, @@ -401,13 +399,14 @@ impl CustomProto { }, PeerState::Disabled { open, ref connected_point, banned_until: Some(ref banned) } - if *banned > self.clock.now() => { + if *banned > Instant::now() => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \ {:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned); *occ_entry.into_mut() = PeerState::DisabledPendingEnable { connected_point: connected_point.clone(), open, - timer: tokio_timer::Delay::new(banned.clone()), + timer: futures_timer::Delay::new_at(banned.clone()).compat(), + timer_deadline: banned.clone(), }; }, @@ -477,13 +476,13 @@ impl CustomProto { *entry.into_mut() = st; }, - PeerState::DisabledPendingEnable { open, connected_point, timer } => { + PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => { debug!(target: "sub-libp2p", "PSM => Drop({:?}): Interrupting pending \ enable", entry.key()); *entry.into_mut() = PeerState::Disabled { open, connected_point, - banned_until: Some(timer.deadline()), + banned_until: Some(timer_deadline), }; }, @@ -508,9 +507,9 @@ impl CustomProto { debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); entry.remove(); }, - PeerState::PendingRequest { timer } => { + PeerState::PendingRequest { timer_deadline, .. } => { debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); - *entry.into_mut() = PeerState::Banned { until: timer.deadline() } + *entry.into_mut() = PeerState::Banned { until: timer_deadline } }, PeerState::Poisoned => @@ -721,12 +720,12 @@ where } } - Some(PeerState::DisabledPendingEnable { open, timer, .. }) => { + Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => { debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ (through {:?}) but pending enable", peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); - self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer.deadline() }); + self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = CustomProtoOut::CustomProtocolClosed { @@ -790,7 +789,7 @@ where PeerState::Requested | PeerState::PendingRequest { .. } => { debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); *entry.into_mut() = PeerState::Banned { - until: self.clock.now() + Duration::from_secs(5) + until: Instant::now() + Duration::from_secs(5) }; debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()) @@ -860,9 +859,14 @@ where debug_assert!(open); *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until }; }, - PeerState::DisabledPendingEnable { open, connected_point, timer } => { + PeerState::DisabledPendingEnable { open, connected_point, timer, timer_deadline } => { debug_assert!(open); - *entry.into_mut() = PeerState::DisabledPendingEnable { open: false, connected_point, timer }; + *entry.into_mut() = PeerState::DisabledPendingEnable { + open: false, + connected_point, + timer, + timer_deadline + }; }, _ => error!(target: "sub-libp2p", "State mismatch in the custom protos handler"), } @@ -973,9 +977,9 @@ where for (peer_id, peer_state) in self.peers.iter_mut() { match mem::replace(peer_state, PeerState::Poisoned) { - PeerState::PendingRequest { mut timer } => { + PeerState::PendingRequest { mut timer, timer_deadline } => { if let Ok(Async::NotReady) = timer.poll() { - *peer_state = PeerState::PendingRequest { timer }; + *peer_state = PeerState::PendingRequest { timer, timer_deadline }; continue; } @@ -984,9 +988,14 @@ where *peer_state = PeerState::Requested; } - PeerState::DisabledPendingEnable { mut timer, connected_point, open } => { + PeerState::DisabledPendingEnable { mut timer, connected_point, open, timer_deadline } => { if let Ok(Async::NotReady) = timer.poll() { - *peer_state = PeerState::DisabledPendingEnable { timer, connected_point, open }; + *peer_state = PeerState::DisabledPendingEnable { + timer, + connected_point, + open, + timer_deadline + }; continue; } diff --git a/substrate/core/network/src/custom_proto/handler.rs b/substrate/core/network/src/custom_proto/handler.rs index 0ec60e79cd..e4832b64b6 100644 --- a/substrate/core/network/src/custom_proto/handler.rs +++ b/substrate/core/network/src/custom_proto/handler.rs @@ -17,6 +17,8 @@ use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; use futures::prelude::*; +use futures03::{compat::Compat, TryFutureExt as _}; +use futures_timer::Delay; use libp2p::core::{ ConnectedPoint, PeerId, Endpoint, ProtocolsHandler, ProtocolsHandlerEvent, protocols_handler::IntoProtocolsHandler, @@ -29,7 +31,6 @@ use log::{debug, error}; use smallvec::{smallvec, SmallVec}; use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::{Delay, clock::Clock}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -119,17 +120,15 @@ where } fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { - let clock = Clock::new(); CustomProtoHandler { protocol: self.protocol, endpoint: connected_point.to_endpoint(), remote_peer_id: remote_peer_id.clone(), state: ProtocolState::Init { substreams: SmallVec::new(), - init_deadline: Delay::new(clock.now() + Duration::from_secs(5)) + init_deadline: Delay::new(Duration::from_secs(5)).compat() }, events_queue: SmallVec::new(), - clock, } } } @@ -155,9 +154,6 @@ pub struct CustomProtoHandler { /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. events_queue: SmallVec<[ProtocolsHandlerEvent, (), CustomProtoHandlerOut>; 16]>, - - /// `Clock` instance that uses the current execution context's source of time. - clock: Clock, } /// State of the handler. @@ -167,14 +163,14 @@ enum ProtocolState { /// List of substreams opened by the remote but that haven't been processed yet. substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, /// Deadline after which the initialization is abnormally long. - init_deadline: Delay, + init_deadline: Compat, }, /// Handler is opening a substream in order to activate itself. /// If we are in this state, we haven't sent any `CustomProtocolOpen` yet. Opening { /// Deadline after which the opening is abnormally long. - deadline: Delay, + deadline: Compat, }, /// Normal operating mode. Contains the substreams that are open. @@ -286,7 +282,7 @@ where }); } ProtocolState::Opening { - deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) + deadline: Delay::new(Duration::from_secs(60)).compat() } } else { @@ -356,7 +352,7 @@ where ProtocolState::Init { substreams, mut init_deadline } => { match init_deadline.poll() { Ok(Async::Ready(())) => { - init_deadline.reset(self.clock.now() + Duration::from_secs(60)); + init_deadline = Delay::new(Duration::from_secs(60)).compat(); error!(target: "sub-libp2p", "Handler initialization process is too long \ with {:?}", self.remote_peer_id) }, @@ -371,7 +367,7 @@ where ProtocolState::Opening { mut deadline } => { match deadline.poll() { Ok(Async::Ready(())) => { - deadline.reset(self.clock.now() + Duration::from_secs(60)); + deadline = Delay::new(Duration::from_secs(60)).compat(); let event = CustomProtoHandlerOut::ProtocolError { is_severe: true, error: "Timeout when opening protocol".to_string().into(), @@ -385,7 +381,7 @@ where }, Err(_) => { error!(target: "sub-libp2p", "Tokio timer has errored"); - deadline.reset(self.clock.now() + Duration::from_secs(60)); + deadline = Delay::new(Duration::from_secs(60)).compat(); self.state = ProtocolState::Opening { deadline }; None }, @@ -454,7 +450,7 @@ where // after all the substreams are closed. if reenable && shutdown.is_empty() { self.state = ProtocolState::Opening { - deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) + deadline: Delay::new(Duration::from_secs(60)).compat() }; Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol.clone()), diff --git a/substrate/core/network/src/debug_info.rs b/substrate/core/network/src/debug_info.rs index f8e688acba..2ab93b04b0 100644 --- a/substrate/core/network/src/debug_info.rs +++ b/substrate/core/network/src/debug_info.rs @@ -16,6 +16,7 @@ use fnv::FnvHashMap; use futures::prelude::*; +use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::Multiaddr; use libp2p::core::{either::EitherOutput, PeerId, PublicKey}; use libp2p::core::protocols_handler::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler}; @@ -27,7 +28,7 @@ use log::{debug, trace, error}; use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Interval; +use futures_timer::Interval; /// Time after we disconnect from a node before we purge its information from the cache. const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60); @@ -44,7 +45,7 @@ pub struct DebugInfoBehaviour { /// Information that we know about all nodes. nodes_info: FnvHashMap, /// Interval at which we perform garbage collection in `nodes_info`. - garbage_collect: Interval, + garbage_collect: Box + Send>, } /// Information about a node we're connected to. @@ -76,7 +77,7 @@ impl DebugInfoBehaviour { ping: Ping::new(PingConfig::new()), identify, nodes_info: FnvHashMap::default(), - garbage_collect: Interval::new_interval(GARBAGE_COLLECT_INTERVAL), + garbage_collect: Box::new(Interval::new(GARBAGE_COLLECT_INTERVAL).map(|()| Ok(())).compat()), } } diff --git a/substrate/core/network/src/discovery.rs b/substrate/core/network/src/discovery.rs index 1a377ba872..87aa966e10 100644 --- a/substrate/core/network/src/discovery.rs +++ b/substrate/core/network/src/discovery.rs @@ -46,6 +46,8 @@ //! use futures::prelude::*; +use futures_timer::Delay; +use futures03::{compat::Compat, TryFutureExt as _}; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::PollParameters; @@ -59,7 +61,6 @@ use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; use std::{cmp, collections::VecDeque, num::NonZeroU8, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::{Delay, clock::Clock}; /// Implementation of `NetworkBehaviour` that discovers the nodes on the network. pub struct DiscoveryBehaviour { @@ -72,13 +73,11 @@ pub struct DiscoveryBehaviour { #[cfg(not(target_os = "unknown"))] mdns: Toggle>>, /// Stream that fires when we need to perform the next random Kademlia query. - next_kad_random_query: Delay, + next_kad_random_query: Compat, /// After `next_kad_random_query` triggers, the next one triggers after this duration. duration_to_next_kad: Duration, /// Discovered nodes to return. discoveries: VecDeque, - /// `Clock` instance that uses the current execution context's source of time. - clock: Clock, /// Identity of our local node. local_peer_id: PeerId, /// Number of nodes we're currently connected to. @@ -104,14 +103,12 @@ impl DiscoveryBehaviour { kademlia.add_address(peer_id, addr.clone()); } - let clock = Clock::new(); DiscoveryBehaviour { user_defined, kademlia, - next_kad_random_query: Delay::new(clock.now()), + next_kad_random_query: Delay::new(Duration::new(0, 0)).compat(), duration_to_next_kad: Duration::from_secs(1), discoveries: VecDeque::new(), - clock, local_peer_id: local_public_key.into_peer_id(), num_connections: 0, #[cfg(not(target_os = "unknown"))] @@ -276,7 +273,7 @@ where self.kademlia.find_node(random_peer_id); // Reset the `Delay` to the next random. - self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad); + self.next_kad_random_query = Delay::new(self.duration_to_next_kad).compat(); self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); }, diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 50b6ad2741..97b6c7ac28 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -17,6 +17,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::custom_proto::{CustomProto, CustomProtoOut}; use futures::prelude::*; +use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::{Multiaddr, PeerId}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; @@ -91,9 +92,9 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { /// Interval at which we call `tick`. - tick_timeout: tokio_timer::Interval, + tick_timeout: Box + Send>, /// Interval at which we call `propagate_extrinsics`. - propagate_timeout: tokio_timer::Interval, + propagate_timeout: Box + Send>, config: ProtocolConfig, /// Handler for on-demand requests. on_demand_core: OnDemandCore, @@ -365,8 +366,8 @@ impl, H: ExHashT> Protocol { let behaviour = CustomProto::new(protocol_id, versions, peerset); let protocol = Protocol { - tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT), - propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT), + tick_timeout: Box::new(futures_timer::Interval::new(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()), + propagate_timeout: Box::new(futures_timer::Interval::new(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()), config: config, context_data: ContextData { peers: HashMap::new(), diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index a7603f7551..f3a8f0c8ea 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -17,7 +17,8 @@ use client::{backend::Backend, blockchain::HeaderBackend}; use crate::config::Roles; use consensus::BlockOrigin; -use std::{time::Duration, time::Instant}; +use futures03::TryFutureExt as _; +use std::time::Duration; use tokio::runtime::current_thread; use super::*; @@ -398,7 +399,7 @@ fn blocks_are_not_announced_by_light_nodes() { net.peers.remove(0); // Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together. - let mut delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5)); + let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).compat(); runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { net.poll(); delay.poll().map_err(|_| ()) @@ -486,7 +487,7 @@ fn can_not_sync_from_light_peer() { net.peers.remove(0); // ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds - let mut test_finished = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5)); + let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).compat(); runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { net.poll(); test_finished.poll().map_err(|_| ())