Switch the network timers to new futures (#3117)

This commit is contained in:
Pierre Krieger
2019-07-19 00:52:19 +02:00
committed by DemiMarie-parity
parent 991e9ae263
commit 1e77717b26
8 changed files with 71 additions and 66 deletions
@@ -19,7 +19,7 @@ use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOu
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use fnv::FnvHashMap;
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{Multiaddr, PeerId};
use log::{debug, error, trace, warn};
@@ -27,7 +27,6 @@ use smallvec::SmallVec;
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::clock::Clock;
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
///
@@ -82,9 +81,6 @@ pub struct CustomProto<TMessage, TSubstream> {
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
/// State of a peer we're connected to.
@@ -105,7 +101,9 @@ enum PeerState {
/// The peerset requested that we connect to this peer. We are not connected to this node.
PendingRequest {
/// When to actually start dialing.
timer: tokio_timer::Delay,
timer: Compat<futures_timer::Delay>,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
/// The peerset requested that we connect to this peer. We are currently dialing this peer.
@@ -135,7 +133,9 @@ enum PeerState {
/// state mismatch.
open: bool,
/// When to enable this remote.
timer: tokio_timer::Delay,
timer: Compat<futures_timer::Delay>,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
/// We are connected to this peer and the peerset has accepted it. The handler is in the
@@ -240,7 +240,6 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
next_incoming_index: peerset::IncomingIndex(0),
events: SmallVec::new(),
marker: PhantomData,
clock: Clock::new(),
}
}
@@ -277,13 +276,13 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
st @ PeerState::Banned { .. } => *entry.into_mut() = st,
// DisabledPendingEnable => Disabled.
PeerState::DisabledPendingEnable { open, connected_point, timer } => {
PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
let banned_until = Some(if let Some(ban) = ban {
cmp::max(timer.deadline(), self.clock.now() + ban)
cmp::max(timer_deadline, Instant::now() + ban)
} else {
timer.deadline()
timer_deadline
});
*entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until }
},
@@ -297,8 +296,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
peer_id: peer_id.clone(),
event: CustomProtoHandlerIn::Disable,
});
let clock = &self.clock;
let banned_until = ban.map(|dur| clock.now() + dur);
let banned_until = ban.map(|dur| Instant::now() + dur);
*entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until }
},
@@ -319,8 +317,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
peer_id: peer_id.clone(),
event: CustomProtoHandlerIn::Disable,
});
let clock = &self.clock;
let banned_until = ban.map(|dur| clock.now() + dur);
let banned_until = ban.map(|dur| Instant::now() + dur);
*entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until }
},
@@ -385,11 +382,12 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
};
match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Banned { ref until } if *until > self.clock.now() => {
PeerState::Banned { ref until } if *until > Instant::now() => {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
*occ_entry.into_mut() = PeerState::PendingRequest {
timer: tokio_timer::Delay::new(until.clone()),
timer: futures_timer::Delay::new_at(until.clone()).compat(),
timer_deadline: until.clone(),
};
},
@@ -401,13 +399,14 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
},
PeerState::Disabled { open, ref connected_point, banned_until: Some(ref banned) }
if *banned > self.clock.now() => {
if *banned > Instant::now() => {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \
{:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connected_point: connected_point.clone(),
open,
timer: tokio_timer::Delay::new(banned.clone()),
timer: futures_timer::Delay::new_at(banned.clone()).compat(),
timer_deadline: banned.clone(),
};
},
@@ -477,13 +476,13 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
*entry.into_mut() = st;
},
PeerState::DisabledPendingEnable { open, connected_point, timer } => {
PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => {
debug!(target: "sub-libp2p", "PSM => Drop({:?}): Interrupting pending \
enable", entry.key());
*entry.into_mut() = PeerState::Disabled {
open,
connected_point,
banned_until: Some(timer.deadline()),
banned_until: Some(timer_deadline),
};
},
@@ -508,9 +507,9 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key());
entry.remove();
},
PeerState::PendingRequest { timer } => {
PeerState::PendingRequest { timer_deadline, .. } => {
debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key());
*entry.into_mut() = PeerState::Banned { until: timer.deadline() }
*entry.into_mut() = PeerState::Banned { until: timer_deadline }
},
PeerState::Poisoned =>
@@ -721,12 +720,12 @@ where
}
}
Some(PeerState::DisabledPendingEnable { open, timer, .. }) => {
Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => {
debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \
(through {:?}) but pending enable", peer_id, endpoint);
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer.deadline() });
self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline });
if open {
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
let event = CustomProtoOut::CustomProtocolClosed {
@@ -790,7 +789,7 @@ where
PeerState::Requested | PeerState::PendingRequest { .. } => {
debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
*entry.into_mut() = PeerState::Banned {
until: self.clock.now() + Duration::from_secs(5)
until: Instant::now() + Duration::from_secs(5)
};
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone())
@@ -860,9 +859,14 @@ where
debug_assert!(open);
*entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until };
},
PeerState::DisabledPendingEnable { open, connected_point, timer } => {
PeerState::DisabledPendingEnable { open, connected_point, timer, timer_deadline } => {
debug_assert!(open);
*entry.into_mut() = PeerState::DisabledPendingEnable { open: false, connected_point, timer };
*entry.into_mut() = PeerState::DisabledPendingEnable {
open: false,
connected_point,
timer,
timer_deadline
};
},
_ => error!(target: "sub-libp2p", "State mismatch in the custom protos handler"),
}
@@ -973,9 +977,9 @@ where
for (peer_id, peer_state) in self.peers.iter_mut() {
match mem::replace(peer_state, PeerState::Poisoned) {
PeerState::PendingRequest { mut timer } => {
PeerState::PendingRequest { mut timer, timer_deadline } => {
if let Ok(Async::NotReady) = timer.poll() {
*peer_state = PeerState::PendingRequest { timer };
*peer_state = PeerState::PendingRequest { timer, timer_deadline };
continue;
}
@@ -984,9 +988,14 @@ where
*peer_state = PeerState::Requested;
}
PeerState::DisabledPendingEnable { mut timer, connected_point, open } => {
PeerState::DisabledPendingEnable { mut timer, connected_point, open, timer_deadline } => {
if let Ok(Async::NotReady) = timer.poll() {
*peer_state = PeerState::DisabledPendingEnable { timer, connected_point, open };
*peer_state = PeerState::DisabledPendingEnable {
timer,
connected_point,
open,
timer_deadline
};
continue;
}
@@ -17,6 +17,8 @@
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
use futures::prelude::*;
use futures03::{compat::Compat, TryFutureExt as _};
use futures_timer::Delay;
use libp2p::core::{
ConnectedPoint, PeerId, Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::IntoProtocolsHandler,
@@ -29,7 +31,6 @@ use log::{debug, error};
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
@@ -119,17 +120,15 @@ where
}
fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
let clock = Clock::new();
CustomProtoHandler {
protocol: self.protocol,
endpoint: connected_point.to_endpoint(),
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(clock.now() + Duration::from_secs(5))
init_deadline: Delay::new(Duration::from_secs(5)).compat()
},
events_queue: SmallVec::new(),
clock,
}
}
}
@@ -155,9 +154,6 @@ pub struct CustomProtoHandler<TMessage, TSubstream> {
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>; 16]>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
/// State of the handler.
@@ -167,14 +163,14 @@ enum ProtocolState<TMessage, TSubstream> {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Delay,
init_deadline: Compat<Delay>,
},
/// Handler is opening a substream in order to activate itself.
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
Opening {
/// Deadline after which the opening is abnormally long.
deadline: Delay,
deadline: Compat<Delay>,
},
/// Normal operating mode. Contains the substreams that are open.
@@ -286,7 +282,7 @@ where
});
}
ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
deadline: Delay::new(Duration::from_secs(60)).compat()
}
} else {
@@ -356,7 +352,7 @@ where
ProtocolState::Init { substreams, mut init_deadline } => {
match init_deadline.poll() {
Ok(Async::Ready(())) => {
init_deadline.reset(self.clock.now() + Duration::from_secs(60));
init_deadline = Delay::new(Duration::from_secs(60)).compat();
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id)
},
@@ -371,7 +367,7 @@ where
ProtocolState::Opening { mut deadline } => {
match deadline.poll() {
Ok(Async::Ready(())) => {
deadline.reset(self.clock.now() + Duration::from_secs(60));
deadline = Delay::new(Duration::from_secs(60)).compat();
let event = CustomProtoHandlerOut::ProtocolError {
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
@@ -385,7 +381,7 @@ where
},
Err(_) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline.reset(self.clock.now() + Duration::from_secs(60));
deadline = Delay::new(Duration::from_secs(60)).compat();
self.state = ProtocolState::Opening { deadline };
None
},
@@ -454,7 +450,7 @@ where
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
self.state = ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
deadline: Delay::new(Duration::from_secs(60)).compat()
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
+4 -3
View File
@@ -16,6 +16,7 @@
use fnv::FnvHashMap;
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use libp2p::Multiaddr;
use libp2p::core::{either::EitherOutput, PeerId, PublicKey};
use libp2p::core::protocols_handler::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
@@ -27,7 +28,7 @@ use log::{debug, trace, error};
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
use futures_timer::Interval;
/// Time after we disconnect from a node before we purge its information from the cache.
const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
@@ -44,7 +45,7 @@ pub struct DebugInfoBehaviour<TSubstream> {
/// Information that we know about all nodes.
nodes_info: FnvHashMap<PeerId, NodeInfo>,
/// Interval at which we perform garbage collection in `nodes_info`.
garbage_collect: Interval,
garbage_collect: Box<dyn Stream<Item = (), Error = ()> + Send>,
}
/// Information about a node we're connected to.
@@ -76,7 +77,7 @@ impl<TSubstream> DebugInfoBehaviour<TSubstream> {
ping: Ping::new(PingConfig::new()),
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Interval::new_interval(GARBAGE_COLLECT_INTERVAL),
garbage_collect: Box::new(Interval::new(GARBAGE_COLLECT_INTERVAL).map(|()| Ok(())).compat()),
}
}
+5 -8
View File
@@ -46,6 +46,8 @@
//!
use futures::prelude::*;
use futures_timer::Delay;
use futures03::{compat::Compat, TryFutureExt as _};
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::PollParameters;
@@ -59,7 +61,6 @@ use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn};
use std::{cmp, collections::VecDeque, num::NonZeroU8, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
pub struct DiscoveryBehaviour<TSubstream> {
@@ -72,13 +73,11 @@ pub struct DiscoveryBehaviour<TSubstream> {
#[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns<Substream<StreamMuxerBox>>>,
/// Stream that fires when we need to perform the next random Kademlia query.
next_kad_random_query: Delay,
next_kad_random_query: Compat<Delay>,
/// After `next_kad_random_query` triggers, the next one triggers after this duration.
duration_to_next_kad: Duration,
/// Discovered nodes to return.
discoveries: VecDeque<PeerId>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
/// Identity of our local node.
local_peer_id: PeerId,
/// Number of nodes we're currently connected to.
@@ -104,14 +103,12 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
kademlia.add_address(peer_id, addr.clone());
}
let clock = Clock::new();
DiscoveryBehaviour {
user_defined,
kademlia,
next_kad_random_query: Delay::new(clock.now()),
next_kad_random_query: Delay::new(Duration::new(0, 0)).compat(),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
clock,
local_peer_id: local_public_key.into_peer_id(),
num_connections: 0,
#[cfg(not(target_os = "unknown"))]
@@ -276,7 +273,7 @@ where
self.kademlia.find_node(random_peer_id);
// Reset the `Delay` to the next random.
self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad);
self.next_kad_random_query = Delay::new(self.duration_to_next_kad).compat();
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));
},
+5 -4
View File
@@ -17,6 +17,7 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use crate::custom_proto::{CustomProto, CustomProtoOut};
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
@@ -91,9 +92,9 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12);
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// Interval at which we call `tick`.
tick_timeout: tokio_timer::Interval,
tick_timeout: Box<dyn Stream<Item = (), Error = ()> + Send>,
/// Interval at which we call `propagate_extrinsics`.
propagate_timeout: tokio_timer::Interval,
propagate_timeout: Box<dyn Stream<Item = (), Error = ()> + Send>,
config: ProtocolConfig,
/// Handler for on-demand requests.
on_demand_core: OnDemandCore<B>,
@@ -365,8 +366,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let behaviour = CustomProto::new(protocol_id, versions, peerset);
let protocol = Protocol {
tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT),
propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT),
tick_timeout: Box::new(futures_timer::Interval::new(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
propagate_timeout: Box::new(futures_timer::Interval::new(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()),
config: config,
context_data: ContextData {
peers: HashMap::new(),
+4 -3
View File
@@ -17,7 +17,8 @@
use client::{backend::Backend, blockchain::HeaderBackend};
use crate::config::Roles;
use consensus::BlockOrigin;
use std::{time::Duration, time::Instant};
use futures03::TryFutureExt as _;
use std::time::Duration;
use tokio::runtime::current_thread;
use super::*;
@@ -398,7 +399,7 @@ fn blocks_are_not_announced_by_light_nodes() {
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5));
let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
delay.poll().map_err(|_| ())
@@ -486,7 +487,7 @@ fn can_not_sync_from_light_peer() {
net.peers.remove(0);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5));
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
test_finished.poll().map_err(|_| ())