Limit number of incoming connections (#391)

* Limit number of incoming connections

* Check Endpoint::Listener before checking num_open_connections.incoming

* Maintain at least 1-1/n portion of outgoing connections

* Remove use

* Default incoming_peers_factor to 2

* Use max_incoming_peers and max_outgoing peers to check whether connections should be dropped

* Fix expected_max_peers: reserved peers are not counted in config.max_peers

* typo: fix test
This commit is contained in:
Wei Tang
2018-07-27 22:13:27 +08:00
committed by Gav Wood
parent 988440a368
commit 6d0bed8a65
2 changed files with 51 additions and 25 deletions
@@ -29,7 +29,6 @@ use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode};
use {NodeIndex, ProtocolId, SessionInfo}; use {NodeIndex, ProtocolId, SessionInfo};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use rand::{self, Rng}; use rand::{self, Rng};
use std::cmp;
use std::fs; use std::fs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::path::Path; use std::path::Path;
@@ -51,10 +50,10 @@ pub struct NetworkState {
/// Active connections. /// Active connections.
connections: RwLock<Connections>, connections: RwLock<Connections>,
/// `min_peers` taken from the configuration. /// Maximum incoming peers.
min_peers: u32, max_incoming_peers: u32,
/// `max_peers` taken from the configuration. /// Maximum outgoing peers.
max_peers: u32, max_outgoing_peers: u32,
/// If true, only reserved peers can connect. /// If true, only reserved peers can connect.
reserved_only: atomic::AtomicBool, reserved_only: atomic::AtomicBool,
@@ -201,13 +200,12 @@ impl NetworkState {
RwLock::new(reserved_peers) RwLock::new(reserved_peers)
}; };
let expected_max_peers = cmp::max(config.max_peers as usize, let expected_max_peers = config.max_peers as usize + config.reserved_nodes.len();
config.reserved_nodes.len());
Ok(NetworkState { Ok(NetworkState {
node_store, node_store,
min_peers: config.min_peers, max_outgoing_peers: config.min_peers,
max_peers: config.max_peers, max_incoming_peers: config.max_peers.saturating_sub(config.min_peers),
connections: RwLock::new(Connections { connections: RwLock::new(Connections {
peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()), peer_by_nodeid: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()), info_by_peer: FnvHashMap::with_capacity_and_hasher(expected_max_peers, Default::default()),
@@ -464,19 +462,14 @@ impl NetworkState {
} }
} }
/// Returns the number of open and pending connections with
/// custom protocols.
pub fn num_open_custom_connections(&self) -> u32 {
num_open_custom_connections(&self.connections.read())
}
/// Returns the number of new outgoing custom connections to peers to /// Returns the number of new outgoing custom connections to peers to
/// open. This takes into account the number of active peers. /// open. This takes into account the number of active peers.
pub fn should_open_outgoing_custom_connections(&self) -> u32 { pub fn should_open_outgoing_custom_connections(&self) -> u32 {
if self.reserved_only.load(atomic::Ordering::Relaxed) { if self.reserved_only.load(atomic::Ordering::Relaxed) {
0 0
} else { } else {
self.min_peers.saturating_sub(self.num_open_custom_connections()) let num_open_custom_connections = num_open_custom_connections(&self.connections.read(), &self.reserved_peers.read());
self.max_outgoing_peers.saturating_sub(num_open_custom_connections.unreserved_outgoing)
} }
} }
@@ -572,7 +565,7 @@ impl NetworkState {
let who = accept_connection(&mut connections, &self.next_node_index, let who = accept_connection(&mut connections, &self.next_node_index,
node_id.clone(), endpoint)?; node_id.clone(), endpoint)?;
let num_open_connections = num_open_custom_connections(&connections); let num_open_connections = num_open_custom_connections(&connections, &self.reserved_peers.read());
let infos = connections.info_by_peer.get_mut(&who) let infos = connections.info_by_peer.get_mut(&who)
.expect("Newly-created peer id is always valid"); .expect("Newly-created peer id is always valid");
@@ -580,7 +573,10 @@ impl NetworkState {
let node_is_reserved = self.reserved_peers.read().contains(&infos.id); let node_is_reserved = self.reserved_peers.read().contains(&infos.id);
if !node_is_reserved { if !node_is_reserved {
if self.reserved_only.load(atomic::Ordering::Relaxed) || if self.reserved_only.load(atomic::Ordering::Relaxed) ||
num_open_connections >= self.max_peers (endpoint == Endpoint::Listener &&
num_open_connections.unreserved_incoming >= self.max_incoming_peers) ||
(endpoint == Endpoint::Dialer &&
num_open_connections.unreserved_outgoing >= self.max_outgoing_peers)
{ {
debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id); debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id);
return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached")) return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached"))
@@ -771,10 +767,19 @@ fn is_peer_disabled(
} }
} }
struct OpenCustomConnectionsNumbers {
/// Total number of open and pending connections.
pub total: u32,
/// Unreserved incoming number of open and pending connections.
pub unreserved_incoming: u32,
/// Unreserved outgoing number of open and pending connections.
pub unreserved_outgoing: u32,
}
/// Returns the number of open and pending connections with /// Returns the number of open and pending connections with
/// custom protocols. /// custom protocols.
fn num_open_custom_connections(connections: &Connections) -> u32 { fn num_open_custom_connections(connections: &Connections, reserved_peers: &FnvHashSet<PeerId>) -> OpenCustomConnectionsNumbers {
connections let filtered = connections
.info_by_peer .info_by_peer
.values() .values()
.filter(|info| .filter(|info|
@@ -784,8 +789,29 @@ fn num_open_custom_connections(connections: &Connections) -> u32 {
_ => false _ => false
} }
) )
) );
.count() as u32
let mut total: u32 = 0;
let mut unreserved_incoming: u32 = 0;
let mut unreserved_outgoing: u32 = 0;
for info in filtered {
total += 1;
let node_is_reserved = reserved_peers.contains(&info.id);
if !node_is_reserved {
if !info.originated {
unreserved_incoming += 1;
} else {
unreserved_outgoing += 1;
}
}
}
OpenCustomConnectionsNumbers {
total,
unreserved_incoming,
unreserved_outgoing,
}
} }
/// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it /// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it