diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index 2aa3a5de98..db8fb27ad8 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -45,13 +45,13 @@ pub struct CustomProtos { /// List of custom protocols that we have open with remotes. open_protocols: Vec<(PeerId, ProtocolId)>, - /// List of peer handlers that were enabled, and whether we're dialing or listening. + /// List of peer handlers that were enabled. /// /// Note that it is possible for a peer to be in the shutdown process, in which case it will /// not be in this list but will be present in `open_protocols`. /// It is also possible that we have *just* enabled a peer, in which case it will be in this /// list but not in `open_protocols`. - enabled_peers: FnvHashMap, + enabled_peers: FnvHashSet, /// Maximum number of incoming non-reserved connections, taken from the config. Never modified. max_incoming_connections: usize, @@ -62,8 +62,8 @@ pub struct CustomProtos { /// If true, only reserved peers can connect. reserved_only: bool, - /// List of the IDs of the peers we are connected to. - connected_peers: FnvHashSet, + /// List of the IDs of the peers we are connected to, and whether we're dialing or listening. + connected_peers: FnvHashMap, /// List of the IDs of the reserved peers. We always try to maintain a connection these peers. reserved_peers: FnvHashSet, @@ -170,7 +170,7 @@ impl CustomProtos { reserved_peers: Default::default(), banned_peers: Vec::new(), open_protocols: Vec::with_capacity(open_protos_cap), - enabled_peers: FnvHashMap::with_capacity_and_hasher(connec_cap, Default::default()), + enabled_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()), next_connect_to_nodes: Delay::new(Instant::now()), events: SmallVec::new(), marker: PhantomData, @@ -226,7 +226,7 @@ impl CustomProtos { // Disconnecting nodes that are connected to us and that aren't reserved let reserved_peers = &mut self.reserved_peers; let events = &mut self.events; - self.enabled_peers.retain(move |peer_id, _| { + self.enabled_peers.retain(move |peer_id| { if reserved_peers.contains(peer_id) { return true } @@ -240,7 +240,7 @@ impl CustomProtos { /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer: &PeerId) { - if self.enabled_peers.remove(peer).is_some() { + if self.enabled_peers.remove(peer) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer.clone(), event: CustomProtosHandlerIn::Disable, @@ -260,7 +260,7 @@ impl CustomProtos { } self.banned_peers.push((peer_id.clone(), Instant::now() + PEER_DISABLE_DURATION)); - if self.enabled_peers.remove(&peer_id).is_some() { + if self.enabled_peers.remove(&peer_id) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id, event: CustomProtosHandlerIn::Disable, @@ -275,7 +275,7 @@ impl CustomProtos { /// Returns true if we try to open protocols with the given peer. pub fn is_enabled(&self, peer_id: &PeerId) -> bool { - self.enabled_peers.contains_key(peer_id) + self.enabled_peers.contains(peer_id) } /// Returns the list of protocols we have open with the given peer. @@ -352,7 +352,7 @@ impl CustomProtos { // Make sure we are connected or connecting to all the reserved nodes. for reserved in self.reserved_peers.iter() { // TODO: don't generate an event if we're already in a pending connection (https://github.com/libp2p/rust-libp2p/issues/697) - if !self.enabled_peers.contains_key(&reserved) { + if !self.enabled_peers.contains(&reserved) { self.events.push(NetworkBehaviourAction::DialPeer { peer_id: reserved.clone() }); } } @@ -370,8 +370,8 @@ impl CustomProtos { let mut num_to_open = { let num_outgoing_connections = self.enabled_peers .iter() - .filter(|(_, endpoint)| endpoint.is_dialer()) - .filter(|(p, _)| !self.reserved_peers.contains(p)) + .filter(|p| self.connected_peers.get(p).map(|c| c.is_dialer()).unwrap_or(false)) + .filter(|p| !self.reserved_peers.contains(p)) .count(); self.max_outgoing_connections - num_outgoing_connections }; @@ -379,12 +379,12 @@ impl CustomProtos { num_to_open); // We first try to enable existing connections. - for peer_id in &self.connected_peers { + for peer_id in self.connected_peers.keys() { if num_to_open == 0 { break } - if self.enabled_peers.contains_key(peer_id) { + if self.enabled_peers.contains(peer_id) { continue; } @@ -414,7 +414,7 @@ impl CustomProtos { continue } - if self.connected_peers.contains(&peer_id) { + if self.connected_peers.contains_key(&peer_id) { continue } @@ -453,7 +453,7 @@ where // When a peer connects, its handler is initially in the disabled state. We make sure that // the peer is allowed, and if so we put it in the enabled state. - self.connected_peers.insert(peer_id.clone()); + self.connected_peers.insert(peer_id.clone(), endpoint.clone()); let is_reserved = self.reserved_peers.contains(&peer_id); if self.reserved_only && !is_reserved { @@ -483,8 +483,8 @@ where match endpoint { ConnectedPoint::Dialer { .. } => { let num_outgoing = self.enabled_peers.iter() - .filter(|(_, e)| e.is_dialer()) - .filter(|(p, _)| !self.reserved_peers.contains(p)) + .filter(|p| self.connected_peers.get(p).map(|c| c.is_dialer()).unwrap_or(false)) + .filter(|p| !self.reserved_peers.contains(p)) .count(); debug_assert!(num_outgoing <= self.max_outgoing_connections); @@ -498,8 +498,8 @@ where } ConnectedPoint::Listener { .. } => { let num_ingoing = self.enabled_peers.iter() - .filter(|(_, e)| e.is_listener()) - .filter(|(p, _)| !self.reserved_peers.contains(p)) + .filter(|p| self.connected_peers.get(p).map(|c| c.is_listener()).unwrap_or(false)) + .filter(|p| !self.reserved_peers.contains(p)) .count(); debug_assert!(num_ingoing <= self.max_incoming_connections); @@ -516,7 +516,7 @@ where } // If everything is fine, enable the node. - debug_assert!(!self.enabled_peers.contains_key(&peer_id)); + debug_assert!(!self.enabled_peers.contains(&peer_id)); // We ask the handler to actively open substreams only if we are the dialer; otherwise // the two nodes will race to be the first to open the unique allowed substream. if endpoint.is_dialer() { @@ -534,12 +534,12 @@ where } self.topology.set_connected(&peer_id, &endpoint); - self.enabled_peers.insert(peer_id, endpoint); + self.enabled_peers.insert(peer_id); } fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { let was_connected = self.connected_peers.remove(&peer_id); - debug_assert!(was_connected); + debug_assert!(was_connected.is_some()); self.topology.set_disconnected(peer_id, &endpoint); @@ -611,16 +611,16 @@ where )); self.open_protocols.push((source.clone(), protocol_id)); - if let Some(address) = self.enabled_peers.get(&source) { - let event = CustomProtosOut::CustomProtocolOpen { - protocol_id, - version, - peer_id: source, - endpoint: address.clone() - }; + let endpoint = self.connected_peers.get(&source) + .expect("We only receive events from connected nodes; QED").clone(); + let event = CustomProtosOut::CustomProtocolOpen { + protocol_id, + version, + peer_id: source, + endpoint, + }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); - } + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } CustomProtosHandlerOut::CustomMessage { protocol_id, message } => { debug_assert!(self.open_protocols.iter().any(|(s, p)| @@ -654,6 +654,7 @@ where } else { debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ {:?}: {:?}", source, protocol_id, error); + self.disconnect_peer(&source); } } } diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index d7b8da5f74..bb326923e1 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -28,6 +28,7 @@ use log::{debug, error, warn}; use smallvec::{smallvec, SmallVec}; use std::{error, fmt, io, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; use void::Void; /// Implements the `ProtocolsHandler` trait of libp2p. @@ -89,12 +90,19 @@ struct PerProtocol { /// State of the handler for a specific protocol. enum PerProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. - /// Contains a list of substreams opened by the remote but that haven't been processed yet. - Init(SmallVec<[RegisteredProtocolSubstream; 6]>), + Init { + /// List of substreams opened by the remote but that haven't been processed yet. + substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + /// Deadline after which the initialization is abnormally long. + init_deadline: Delay, + }, /// Handler is opening a substream in order to activate itself. /// If we are in this state, we haven't sent any `CustomProtocolOpen` yet. - Opening, + Opening { + /// Deadline after which the opening is abnormally long. + deadline: Delay, + }, /// Backwards-compatible mode. Contains the unique substream that is open. /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. @@ -174,7 +182,7 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { PerProtocolState::Poisoned } - PerProtocolState::Init(incoming) => { + PerProtocolState::Init { substreams: incoming, .. } => { if incoming.is_empty() { if let Endpoint::Dialer = endpoint { return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { @@ -184,7 +192,9 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { } else { return_value = None; } - PerProtocolState::Opening + PerProtocolState::Opening { + deadline: Delay::new(Instant::now() + Duration::from_secs(60)) + } } else if incoming.iter().any(|s| s.is_multiplex()) { let event = CustomProtosHandlerOut::CustomProtocolOpen { @@ -215,7 +225,7 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { } } - st @ PerProtocolState::Opening => { return_value = None; st } + st @ PerProtocolState::Opening { .. } => { return_value = None; st } st @ PerProtocolState::BackCompat { .. } => { return_value = None; st } st @ PerProtocolState::Normal { .. } => { return_value = None; st } PerProtocolState::Disabled { shutdown, .. } => { @@ -242,14 +252,14 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { PerProtocolState::Poisoned } - PerProtocolState::Init(mut shutdown) => { + PerProtocolState::Init { substreams: mut shutdown, .. } => { for s in &mut shutdown { s.shutdown(); } PerProtocolState::Disabled { shutdown, reenable: false } } - PerProtocolState::Opening => { + PerProtocolState::Opening { .. } => { PerProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } } @@ -296,12 +306,12 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { PerProtocolState::Poisoned } - PerProtocolState::Init(mut list) => { + PerProtocolState::Init { substreams: mut list, .. } => { for s in &mut list { s.shutdown(); } PerProtocolState::ShuttingDown(list) } - PerProtocolState::Opening => { + PerProtocolState::Opening { .. } => { PerProtocolState::ShuttingDown(SmallVec::new()) } @@ -347,14 +357,41 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { PerProtocolState::Poisoned } - st @ PerProtocolState::Init(_) => { + PerProtocolState::Init { substreams, mut init_deadline } => { + match init_deadline.poll() { + Ok(Async::Ready(())) => + error!(target: "sub-libp2p", "Handler initialization process is too long"), + Ok(Async::NotReady) => {} + Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored") + } + return_value = None; - st + PerProtocolState::Init { substreams, init_deadline } } - st @ PerProtocolState::Opening { .. } => { - return_value = None; - st + PerProtocolState::Opening { mut deadline } => { + match deadline.poll() { + Ok(Async::Ready(())) => { + deadline.reset(Instant::now() + Duration::from_secs(60)); + let event = CustomProtosHandlerOut::ProtocolError { + protocol_id: self.protocol.id(), + is_severe: false, + error: "Timeout when opening protocol".to_string().into(), + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::Opening { deadline } + }, + Ok(Async::NotReady) => { + return_value = None; + PerProtocolState::Opening { deadline } + }, + Err(_) => { + error!(target: "sub-libp2p", "Tokio timer has errored"); + deadline.reset(Instant::now() + Duration::from_secs(60)); + return_value = None; + PerProtocolState::Opening { deadline } + }, + } } PerProtocolState::BackCompat { mut substream, shutdown } => { @@ -423,7 +460,9 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { upgrade: self.protocol.clone(), info: self.protocol.id(), }); - PerProtocolState::Opening + PerProtocolState::Opening { + deadline: Delay::new(Instant::now() + Duration::from_secs(60)) + } } else { return_value = None; PerProtocolState::Disabled { shutdown, reenable } @@ -622,7 +661,10 @@ where protocols: protocols.0.into_iter().map(|protocol| { PerProtocol { protocol, - state: PerProtocolState::Init(SmallVec::new()), + state: PerProtocolState::Init { + substreams: SmallVec::new(), + init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) + }, } }).collect(), events_queue: SmallVec::new(), @@ -672,15 +714,15 @@ where PerProtocolState::Poisoned } - PerProtocolState::Init(mut incoming) => { + PerProtocolState::Init { mut substreams, init_deadline } => { if substream.endpoint() == Endpoint::Dialer { error!(target: "sub-libp2p", "Opened dialing substream before initialization"); } - incoming.push(substream); - PerProtocolState::Init(incoming) + substreams.push(substream); + PerProtocolState::Init { substreams, init_deadline } } - PerProtocolState::Opening => { + PerProtocolState::Opening { .. } => { let event = CustomProtosHandlerOut::CustomProtocolOpen { protocol_id: substream.protocol_id(), version: substream.protocol_version() @@ -893,15 +935,23 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { return KeepAlive::Until(self.warm_up_end) } + let mut keep_forever = false; + for protocol in self.protocols.iter() { match protocol.state { + PerProtocolState::Init { .. } | PerProtocolState::Opening { .. } => {} + PerProtocolState::BackCompat { .. } | PerProtocolState::Normal { .. } => + keep_forever = true, PerProtocolState::Disabled { .. } | PerProtocolState::ShuttingDown(_) | PerProtocolState::Poisoned => return KeepAlive::Now, - _ => {} } } - KeepAlive::Forever + if keep_forever { + KeepAlive::Forever + } else { + KeepAlive::Now + } } fn shutdown(&mut self) {