diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index c6401ef79e..a10f3697d8 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -26,9 +26,9 @@ use libp2p::kad::{Kademlia, KademliaOut}; use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::ping::{Ping, PingEvent}; use log::{debug, trace, warn}; -use std::{cmp, io, fmt, time::Duration, time::Instant}; +use std::{cmp, io, fmt, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; +use tokio_timer::{Delay, clock::Clock}; use void; /// General behaviour of the network. @@ -73,14 +73,16 @@ impl Behaviour { kademlia.add_connected_address(peer_id, addr.clone()); } + let clock = Clock::new(); Behaviour { ping: Ping::new(), custom_protocols, discovery: DiscoveryBehaviour { user_defined: known_addresses, kademlia, - next_kad_random_query: Delay::new(Instant::now()), + next_kad_random_query: Delay::new(clock.now()), duration_to_next_kad: Duration::from_secs(1), + clock, }, identify, mdns: if enable_mdns { @@ -331,6 +333,8 @@ pub struct DiscoveryBehaviour { next_kad_random_query: Delay, /// After `next_kad_random_query` triggers, the next one triggers after this duration. duration_to_next_kad: Duration, + /// `Clock` instance that uses the current execution context's source of time. + clock: Clock, } impl NetworkBehaviour for DiscoveryBehaviour @@ -408,7 +412,7 @@ where self.kademlia.find_node(random_peer_id); // Reset the `Delay` to the next random. - self.next_kad_random_query.reset(Instant::now() + self.duration_to_next_kad); + self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad); self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); }, diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index 7f29711a51..ca1363fab1 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -24,6 +24,7 @@ use log::{debug, error, trace, warn}; use smallvec::SmallVec; use std::{collections::hash_map::Entry, cmp, error, io, marker::PhantomData, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::clock::Clock; /// Network behaviour that handles opening substreams for custom protocols with other nodes. /// @@ -78,6 +79,9 @@ pub struct CustomProto { /// Marker to pin the generics. marker: PhantomData, + + /// `Clock` instance that uses the current execution context's source of time. + clock: Clock, } /// State of a peer we're connected to. @@ -214,6 +218,7 @@ impl CustomProto { next_incoming_index: substrate_peerset::IncomingIndex(0), events: SmallVec::new(), marker: PhantomData, + clock: Clock::new(), } } @@ -244,7 +249,7 @@ impl CustomProto { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); let banned_until = Some(if let Some(ban) = ban { - cmp::max(timer.deadline(), Instant::now() + ban) + cmp::max(timer.deadline(), self.clock.now() + ban) } else { timer.deadline() }); @@ -260,7 +265,8 @@ impl CustomProto { peer_id: peer_id.clone(), event: CustomProtoHandlerIn::Disable, }); - let banned_until = ban.map(|dur| Instant::now() + dur); + let clock = &self.clock; + let banned_until = ban.map(|dur| clock.now() + dur); *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } }, @@ -281,7 +287,8 @@ impl CustomProto { peer_id: peer_id.clone(), event: CustomProtoHandlerIn::Disable, }); - let banned_until = ban.map(|dur| Instant::now() + dur); + let clock = &self.clock; + let banned_until = ban.map(|dur| clock.now() + dur); *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until } }, @@ -369,7 +376,7 @@ impl CustomProto { }; match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) { - PeerState::Banned { ref until } if *until > Instant::now() => { + PeerState::Banned { ref until } if *until > self.clock.now() => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \ until {:?}", occ_entry.key(), until); *occ_entry.into_mut() = PeerState::PendingRequest { @@ -385,7 +392,7 @@ impl CustomProto { }, PeerState::Disabled { open, ref connected_point, banned_until: Some(ref banned) } - if *banned > Instant::now() => { + if *banned > self.clock.now() => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \ {:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned); *occ_entry.into_mut() = PeerState::DisabledPendingEnable { @@ -758,7 +765,7 @@ where PeerState::Requested | PeerState::PendingRequest { .. } => { debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); *entry.into_mut() = PeerState::Banned { - until: Instant::now() + Duration::from_secs(5) + until: self.clock.now() + Duration::from_secs(5) }; debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()) diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index 969df7799b..b3c577ce4c 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -28,7 +28,7 @@ use log::{debug, error, warn}; use smallvec::{smallvec, SmallVec}; use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; +use tokio_timer::{Delay, clock::Clock}; use void::Void; /// Implements the `IntoProtocolsHandler` trait of libp2p. @@ -119,15 +119,17 @@ where type Handler = CustomProtoHandler; fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { + let clock = Clock::new(); CustomProtoHandler { protocol: self.protocol, remote_peer_id: remote_peer_id.clone(), state: ProtocolState::Init { substreams: SmallVec::new(), - init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) + init_deadline: Delay::new(clock.now() + Duration::from_secs(5)) }, events_queue: SmallVec::new(), - warm_up_end: Instant::now() + Duration::from_secs(5), + warm_up_end: clock.now() + Duration::from_secs(5), + clock, } } } @@ -153,6 +155,10 @@ pub struct CustomProtoHandler { /// We have a warm-up period after creating the handler during which we don't shut down the /// connection. warm_up_end: Instant, + + /// `Clock` instance that uses the current execution context's source of time. + clock: Clock, + } /// State of the handler. @@ -404,7 +410,7 @@ where }); } ProtocolState::Opening { - deadline: Delay::new(Instant::now() + Duration::from_secs(60)) + deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) } } else if incoming.iter().any(|s| s.is_multiplex()) { @@ -514,7 +520,7 @@ where ProtocolState::Init { substreams, mut init_deadline } => { match init_deadline.poll() { Ok(Async::Ready(())) => { - init_deadline.reset(Instant::now() + Duration::from_secs(60)); + init_deadline.reset(self.clock.now() + Duration::from_secs(60)); error!(target: "sub-libp2p", "Handler initialization process is too long \ with {:?}", self.remote_peer_id) }, @@ -529,7 +535,7 @@ where ProtocolState::Opening { mut deadline } => { match deadline.poll() { Ok(Async::Ready(())) => { - deadline.reset(Instant::now() + Duration::from_secs(60)); + deadline.reset(self.clock.now() + Duration::from_secs(60)); let event = CustomProtoHandlerOut::ProtocolError { is_severe: true, error: "Timeout when opening protocol".to_string().into(), @@ -543,7 +549,7 @@ where }, Err(_) => { error!(target: "sub-libp2p", "Tokio timer has errored"); - deadline.reset(Instant::now() + Duration::from_secs(60)); + deadline.reset(self.clock.now() + Duration::from_secs(60)); return_value = None; ProtocolState::Opening { deadline } }, @@ -613,7 +619,7 @@ where info: (), }); ProtocolState::Opening { - deadline: Delay::new(Instant::now() + Duration::from_secs(60)) + deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) } } else { return_value = None; @@ -829,7 +835,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { } fn connection_keep_alive(&self) -> KeepAlive { - if self.warm_up_end >= Instant::now() { + if self.warm_up_end >= self.clock.now() { return KeepAlive::Until(self.warm_up_end) }