Add timeouts for initialization and opening (#1948)

* Add timeouts for initialization and opening

* Don't keep alive opening and init

* Reset deadline when timer error
This commit is contained in:
Pierre Krieger
2019-03-11 12:09:03 +01:00
committed by Bastian Köcher
parent ffd803c3d8
commit a3a5031d9d
2 changed files with 106 additions and 55 deletions
@@ -45,13 +45,13 @@ pub struct CustomProtos<TMessage, TSubstream> {
/// List of custom protocols that we have open with remotes.
open_protocols: Vec<(PeerId, ProtocolId)>,
/// List of peer handlers that were enabled, and whether we're dialing or listening.
/// List of peer handlers that were enabled.
///
/// Note that it is possible for a peer to be in the shutdown process, in which case it will
/// not be in this list but will be present in `open_protocols`.
/// It is also possible that we have *just* enabled a peer, in which case it will be in this
/// list but not in `open_protocols`.
enabled_peers: FnvHashMap<PeerId, ConnectedPoint>,
enabled_peers: FnvHashSet<PeerId>,
/// Maximum number of incoming non-reserved connections, taken from the config. Never modified.
max_incoming_connections: usize,
@@ -62,8 +62,8 @@ pub struct CustomProtos<TMessage, TSubstream> {
/// If true, only reserved peers can connect.
reserved_only: bool,
/// List of the IDs of the peers we are connected to.
connected_peers: FnvHashSet<PeerId>,
/// List of the IDs of the peers we are connected to, and whether we're dialing or listening.
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
/// List of the IDs of the reserved peers. We always try to maintain a connection these peers.
reserved_peers: FnvHashSet<PeerId>,
@@ -170,7 +170,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
reserved_peers: Default::default(),
banned_peers: Vec::new(),
open_protocols: Vec::with_capacity(open_protos_cap),
enabled_peers: FnvHashMap::with_capacity_and_hasher(connec_cap, Default::default()),
enabled_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()),
next_connect_to_nodes: Delay::new(Instant::now()),
events: SmallVec::new(),
marker: PhantomData,
@@ -226,7 +226,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
// Disconnecting nodes that are connected to us and that aren't reserved
let reserved_peers = &mut self.reserved_peers;
let events = &mut self.events;
self.enabled_peers.retain(move |peer_id, _| {
self.enabled_peers.retain(move |peer_id| {
if reserved_peers.contains(peer_id) {
return true
}
@@ -240,7 +240,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
/// Disconnects the given peer if we are connected to it.
pub fn disconnect_peer(&mut self, peer: &PeerId) {
if self.enabled_peers.remove(peer).is_some() {
if self.enabled_peers.remove(peer) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer.clone(),
event: CustomProtosHandlerIn::Disable,
@@ -260,7 +260,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
}
self.banned_peers.push((peer_id.clone(), Instant::now() + PEER_DISABLE_DURATION));
if self.enabled_peers.remove(&peer_id).is_some() {
if self.enabled_peers.remove(&peer_id) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id,
event: CustomProtosHandlerIn::Disable,
@@ -275,7 +275,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
/// Returns true if we try to open protocols with the given peer.
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
self.enabled_peers.contains_key(peer_id)
self.enabled_peers.contains(peer_id)
}
/// Returns the list of protocols we have open with the given peer.
@@ -352,7 +352,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
// Make sure we are connected or connecting to all the reserved nodes.
for reserved in self.reserved_peers.iter() {
// TODO: don't generate an event if we're already in a pending connection (https://github.com/libp2p/rust-libp2p/issues/697)
if !self.enabled_peers.contains_key(&reserved) {
if !self.enabled_peers.contains(&reserved) {
self.events.push(NetworkBehaviourAction::DialPeer { peer_id: reserved.clone() });
}
}
@@ -370,8 +370,8 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
let mut num_to_open = {
let num_outgoing_connections = self.enabled_peers
.iter()
.filter(|(_, endpoint)| endpoint.is_dialer())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.filter(|p| self.connected_peers.get(p).map(|c| c.is_dialer()).unwrap_or(false))
.filter(|p| !self.reserved_peers.contains(p))
.count();
self.max_outgoing_connections - num_outgoing_connections
};
@@ -379,12 +379,12 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
num_to_open);
// We first try to enable existing connections.
for peer_id in &self.connected_peers {
for peer_id in self.connected_peers.keys() {
if num_to_open == 0 {
break
}
if self.enabled_peers.contains_key(peer_id) {
if self.enabled_peers.contains(peer_id) {
continue;
}
@@ -414,7 +414,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
continue
}
if self.connected_peers.contains(&peer_id) {
if self.connected_peers.contains_key(&peer_id) {
continue
}
@@ -453,7 +453,7 @@ where
// When a peer connects, its handler is initially in the disabled state. We make sure that
// the peer is allowed, and if so we put it in the enabled state.
self.connected_peers.insert(peer_id.clone());
self.connected_peers.insert(peer_id.clone(), endpoint.clone());
let is_reserved = self.reserved_peers.contains(&peer_id);
if self.reserved_only && !is_reserved {
@@ -483,8 +483,8 @@ where
match endpoint {
ConnectedPoint::Dialer { .. } => {
let num_outgoing = self.enabled_peers.iter()
.filter(|(_, e)| e.is_dialer())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.filter(|p| self.connected_peers.get(p).map(|c| c.is_dialer()).unwrap_or(false))
.filter(|p| !self.reserved_peers.contains(p))
.count();
debug_assert!(num_outgoing <= self.max_outgoing_connections);
@@ -498,8 +498,8 @@ where
}
ConnectedPoint::Listener { .. } => {
let num_ingoing = self.enabled_peers.iter()
.filter(|(_, e)| e.is_listener())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.filter(|p| self.connected_peers.get(p).map(|c| c.is_listener()).unwrap_or(false))
.filter(|p| !self.reserved_peers.contains(p))
.count();
debug_assert!(num_ingoing <= self.max_incoming_connections);
@@ -516,7 +516,7 @@ where
}
// If everything is fine, enable the node.
debug_assert!(!self.enabled_peers.contains_key(&peer_id));
debug_assert!(!self.enabled_peers.contains(&peer_id));
// We ask the handler to actively open substreams only if we are the dialer; otherwise
// the two nodes will race to be the first to open the unique allowed substream.
if endpoint.is_dialer() {
@@ -534,12 +534,12 @@ where
}
self.topology.set_connected(&peer_id, &endpoint);
self.enabled_peers.insert(peer_id, endpoint);
self.enabled_peers.insert(peer_id);
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
let was_connected = self.connected_peers.remove(&peer_id);
debug_assert!(was_connected);
debug_assert!(was_connected.is_some());
self.topology.set_disconnected(peer_id, &endpoint);
@@ -611,16 +611,16 @@ where
));
self.open_protocols.push((source.clone(), protocol_id));
if let Some(address) = self.enabled_peers.get(&source) {
let event = CustomProtosOut::CustomProtocolOpen {
protocol_id,
version,
peer_id: source,
endpoint: address.clone()
};
let endpoint = self.connected_peers.get(&source)
.expect("We only receive events from connected nodes; QED").clone();
let event = CustomProtosOut::CustomProtocolOpen {
protocol_id,
version,
peer_id: source,
endpoint,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::CustomMessage { protocol_id, message } => {
debug_assert!(self.open_protocols.iter().any(|(s, p)|
@@ -654,6 +654,7 @@ where
} else {
debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
self.disconnect_peer(&source);
}
}
}
@@ -28,6 +28,7 @@ use log::{debug, error, warn};
use smallvec::{smallvec, SmallVec};
use std::{error, fmt, io, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use void::Void;
/// Implements the `ProtocolsHandler` trait of libp2p.
@@ -89,12 +90,19 @@ struct PerProtocol<TMessage, TSubstream> {
/// State of the handler for a specific protocol.
enum PerProtocolState<TMessage, TSubstream> {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
/// Contains a list of substreams opened by the remote but that haven't been processed yet.
Init(SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>),
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Delay,
},
/// Handler is opening a substream in order to activate itself.
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
Opening,
Opening {
/// Deadline after which the opening is abnormally long.
deadline: Delay,
},
/// Backwards-compatible mode. Contains the unique substream that is open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
@@ -174,7 +182,7 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
PerProtocolState::Poisoned
}
PerProtocolState::Init(incoming) => {
PerProtocolState::Init { substreams: incoming, .. } => {
if incoming.is_empty() {
if let Endpoint::Dialer = endpoint {
return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -184,7 +192,9 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
} else {
return_value = None;
}
PerProtocolState::Opening
PerProtocolState::Opening {
deadline: Delay::new(Instant::now() + Duration::from_secs(60))
}
} else if incoming.iter().any(|s| s.is_multiplex()) {
let event = CustomProtosHandlerOut::CustomProtocolOpen {
@@ -215,7 +225,7 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
}
}
st @ PerProtocolState::Opening => { return_value = None; st }
st @ PerProtocolState::Opening { .. } => { return_value = None; st }
st @ PerProtocolState::BackCompat { .. } => { return_value = None; st }
st @ PerProtocolState::Normal { .. } => { return_value = None; st }
PerProtocolState::Disabled { shutdown, .. } => {
@@ -242,14 +252,14 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
PerProtocolState::Poisoned
}
PerProtocolState::Init(mut shutdown) => {
PerProtocolState::Init { substreams: mut shutdown, .. } => {
for s in &mut shutdown {
s.shutdown();
}
PerProtocolState::Disabled { shutdown, reenable: false }
}
PerProtocolState::Opening => {
PerProtocolState::Opening { .. } => {
PerProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false }
}
@@ -296,12 +306,12 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
PerProtocolState::Poisoned
}
PerProtocolState::Init(mut list) => {
PerProtocolState::Init { substreams: mut list, .. } => {
for s in &mut list { s.shutdown(); }
PerProtocolState::ShuttingDown(list)
}
PerProtocolState::Opening => {
PerProtocolState::Opening { .. } => {
PerProtocolState::ShuttingDown(SmallVec::new())
}
@@ -347,14 +357,41 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
PerProtocolState::Poisoned
}
st @ PerProtocolState::Init(_) => {
PerProtocolState::Init { substreams, mut init_deadline } => {
match init_deadline.poll() {
Ok(Async::Ready(())) =>
error!(target: "sub-libp2p", "Handler initialization process is too long"),
Ok(Async::NotReady) => {}
Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored")
}
return_value = None;
st
PerProtocolState::Init { substreams, init_deadline }
}
st @ PerProtocolState::Opening { .. } => {
return_value = None;
st
PerProtocolState::Opening { mut deadline } => {
match deadline.poll() {
Ok(Async::Ready(())) => {
deadline.reset(Instant::now() + Duration::from_secs(60));
let event = CustomProtosHandlerOut::ProtocolError {
protocol_id: self.protocol.id(),
is_severe: false,
error: "Timeout when opening protocol".to_string().into(),
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
PerProtocolState::Opening { deadline }
},
Ok(Async::NotReady) => {
return_value = None;
PerProtocolState::Opening { deadline }
},
Err(_) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline.reset(Instant::now() + Duration::from_secs(60));
return_value = None;
PerProtocolState::Opening { deadline }
},
}
}
PerProtocolState::BackCompat { mut substream, shutdown } => {
@@ -423,7 +460,9 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
upgrade: self.protocol.clone(),
info: self.protocol.id(),
});
PerProtocolState::Opening
PerProtocolState::Opening {
deadline: Delay::new(Instant::now() + Duration::from_secs(60))
}
} else {
return_value = None;
PerProtocolState::Disabled { shutdown, reenable }
@@ -622,7 +661,10 @@ where
protocols: protocols.0.into_iter().map(|protocol| {
PerProtocol {
protocol,
state: PerProtocolState::Init(SmallVec::new()),
state: PerProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Instant::now() + Duration::from_secs(5))
},
}
}).collect(),
events_queue: SmallVec::new(),
@@ -672,15 +714,15 @@ where
PerProtocolState::Poisoned
}
PerProtocolState::Init(mut incoming) => {
PerProtocolState::Init { mut substreams, init_deadline } => {
if substream.endpoint() == Endpoint::Dialer {
error!(target: "sub-libp2p", "Opened dialing substream before initialization");
}
incoming.push(substream);
PerProtocolState::Init(incoming)
substreams.push(substream);
PerProtocolState::Init { substreams, init_deadline }
}
PerProtocolState::Opening => {
PerProtocolState::Opening { .. } => {
let event = CustomProtosHandlerOut::CustomProtocolOpen {
protocol_id: substream.protocol_id(),
version: substream.protocol_version()
@@ -893,15 +935,23 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
return KeepAlive::Until(self.warm_up_end)
}
let mut keep_forever = false;
for protocol in self.protocols.iter() {
match protocol.state {
PerProtocolState::Init { .. } | PerProtocolState::Opening { .. } => {}
PerProtocolState::BackCompat { .. } | PerProtocolState::Normal { .. } =>
keep_forever = true,
PerProtocolState::Disabled { .. } | PerProtocolState::ShuttingDown(_) |
PerProtocolState::Poisoned => return KeepAlive::Now,
_ => {}
}
}
KeepAlive::Forever
if keep_forever {
KeepAlive::Forever
} else {
KeepAlive::Now
}
}
fn shutdown(&mut self) {