substrate-network-libp2p uses tokio_timer::clock::Clock to get current time (#2296)

This commit is contained in:
Marek Kotewicz
2019-04-19 13:24:53 +02:00
committed by Bastian Köcher
parent c2065aafd6
commit 294115c67d
3 changed files with 36 additions and 19 deletions
@@ -26,9 +26,9 @@ use libp2p::kad::{Kademlia, KademliaOut};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::ping::{Ping, PingEvent};
use log::{debug, trace, warn};
use std::{cmp, io, fmt, time::Duration, time::Instant};
use std::{cmp, io, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use tokio_timer::{Delay, clock::Clock};
use void;
/// General behaviour of the network.
@@ -73,14 +73,16 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
kademlia.add_connected_address(peer_id, addr.clone());
}
let clock = Clock::new();
Behaviour {
ping: Ping::new(),
custom_protocols,
discovery: DiscoveryBehaviour {
user_defined: known_addresses,
kademlia,
next_kad_random_query: Delay::new(Instant::now()),
next_kad_random_query: Delay::new(clock.now()),
duration_to_next_kad: Duration::from_secs(1),
clock,
},
identify,
mdns: if enable_mdns {
@@ -331,6 +333,8 @@ pub struct DiscoveryBehaviour<TSubstream> {
next_kad_random_query: Delay,
/// After `next_kad_random_query` triggers, the next one triggers after this duration.
duration_to_next_kad: Duration,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
@@ -408,7 +412,7 @@ where
self.kademlia.find_node(random_peer_id);
// Reset the `Delay` to the next random.
self.next_kad_random_query.reset(Instant::now() + self.duration_to_next_kad);
self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad);
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));
},
@@ -24,6 +24,7 @@ use log::{debug, error, trace, warn};
use smallvec::SmallVec;
use std::{collections::hash_map::Entry, cmp, error, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::clock::Clock;
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
///
@@ -78,6 +79,9 @@ pub struct CustomProto<TMessage, TSubstream> {
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
/// State of a peer we're connected to.
@@ -214,6 +218,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
next_incoming_index: substrate_peerset::IncomingIndex(0),
events: SmallVec::new(),
marker: PhantomData,
clock: Clock::new(),
}
}
@@ -244,7 +249,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
let banned_until = Some(if let Some(ban) = ban {
cmp::max(timer.deadline(), Instant::now() + ban)
cmp::max(timer.deadline(), self.clock.now() + ban)
} else {
timer.deadline()
});
@@ -260,7 +265,8 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
peer_id: peer_id.clone(),
event: CustomProtoHandlerIn::Disable,
});
let banned_until = ban.map(|dur| Instant::now() + dur);
let clock = &self.clock;
let banned_until = ban.map(|dur| clock.now() + dur);
*entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until }
},
@@ -281,7 +287,8 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
peer_id: peer_id.clone(),
event: CustomProtoHandlerIn::Disable,
});
let banned_until = ban.map(|dur| Instant::now() + dur);
let clock = &self.clock;
let banned_until = ban.map(|dur| clock.now() + dur);
*entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until }
},
@@ -369,7 +376,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
};
match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Banned { ref until } if *until > Instant::now() => {
PeerState::Banned { ref until } if *until > self.clock.now() => {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
*occ_entry.into_mut() = PeerState::PendingRequest {
@@ -385,7 +392,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
},
PeerState::Disabled { open, ref connected_point, banned_until: Some(ref banned) }
if *banned > Instant::now() => {
if *banned > self.clock.now() => {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \
{:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
@@ -758,7 +765,7 @@ where
PeerState::Requested | PeerState::PendingRequest { .. } => {
debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
*entry.into_mut() = PeerState::Banned {
until: Instant::now() + Duration::from_secs(5)
until: self.clock.now() + Duration::from_secs(5)
};
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone())
@@ -28,7 +28,7 @@ use log::{debug, error, warn};
use smallvec::{smallvec, SmallVec};
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use tokio_timer::{Delay, clock::Clock};
use void::Void;
/// Implements the `IntoProtocolsHandler` trait of libp2p.
@@ -119,15 +119,17 @@ where
type Handler = CustomProtoHandler<TMessage, TSubstream>;
fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler {
let clock = Clock::new();
CustomProtoHandler {
protocol: self.protocol,
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Instant::now() + Duration::from_secs(5))
init_deadline: Delay::new(clock.now() + Duration::from_secs(5))
},
events_queue: SmallVec::new(),
warm_up_end: Instant::now() + Duration::from_secs(5),
warm_up_end: clock.now() + Duration::from_secs(5),
clock,
}
}
}
@@ -153,6 +155,10 @@ pub struct CustomProtoHandler<TMessage, TSubstream> {
/// We have a warm-up period after creating the handler during which we don't shut down the
/// connection.
warm_up_end: Instant,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
/// State of the handler.
@@ -404,7 +410,7 @@ where
});
}
ProtocolState::Opening {
deadline: Delay::new(Instant::now() + Duration::from_secs(60))
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
}
} else if incoming.iter().any(|s| s.is_multiplex()) {
@@ -514,7 +520,7 @@ where
ProtocolState::Init { substreams, mut init_deadline } => {
match init_deadline.poll() {
Ok(Async::Ready(())) => {
init_deadline.reset(Instant::now() + Duration::from_secs(60));
init_deadline.reset(self.clock.now() + Duration::from_secs(60));
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id)
},
@@ -529,7 +535,7 @@ where
ProtocolState::Opening { mut deadline } => {
match deadline.poll() {
Ok(Async::Ready(())) => {
deadline.reset(Instant::now() + Duration::from_secs(60));
deadline.reset(self.clock.now() + Duration::from_secs(60));
let event = CustomProtoHandlerOut::ProtocolError {
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
@@ -543,7 +549,7 @@ where
},
Err(_) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline.reset(Instant::now() + Duration::from_secs(60));
deadline.reset(self.clock.now() + Duration::from_secs(60));
return_value = None;
ProtocolState::Opening { deadline }
},
@@ -613,7 +619,7 @@ where
info: (),
});
ProtocolState::Opening {
deadline: Delay::new(Instant::now() + Duration::from_secs(60))
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
}
} else {
return_value = None;
@@ -829,7 +835,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
}
fn connection_keep_alive(&self) -> KeepAlive {
if self.warm_up_end >= Instant::now() {
if self.warm_up_end >= self.clock.now() {
return KeepAlive::Until(self.warm_up_end)
}