mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-28 23:47:25 +00:00
Fix the networking (#364)
* Serve only non-empty Kademlia nodes * Use the number of custom protos to determine whether to open more * Add timeout when connecting * Connect to random peers from the peer store * Various adjustements * Typo * Explicitely connect to bootnodes * Fix potential overflow
This commit is contained in:
@@ -11,7 +11,7 @@ bytes = "0.4"
|
||||
error-chain = { version = "0.12", default-features = false }
|
||||
fnv = "1.0"
|
||||
futures = "0.1"
|
||||
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "2fb5ef1d40f2565e592248abbd21b7ca2da992e0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
|
||||
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "727e0e099b53a4032a7e2330994c819fe866add7", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
|
||||
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
|
||||
ethkey = { git = "https://github.com/paritytech/parity.git" }
|
||||
ethereum-types = "0.3"
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
|
||||
use bytes::Bytes;
|
||||
use fnv::{FnvHashMap, FnvHashSet};
|
||||
use futures::{future, sync::mpsc};
|
||||
use futures::sync::mpsc;
|
||||
use libp2p::core::{Multiaddr, AddrComponent, Endpoint, UniqueConnec};
|
||||
use libp2p::core::{PeerId as PeerstorePeerId, PublicKey};
|
||||
use libp2p::core::{UniqueConnecState, PeerId as PeerstorePeerId, PublicKey};
|
||||
use libp2p::kad::KadConnecController;
|
||||
use libp2p::peerstore::{Peerstore, PeerAccess};
|
||||
use libp2p::peerstore::json_peerstore::JsonPeerstore;
|
||||
@@ -192,7 +192,28 @@ impl NetworkState {
|
||||
&self.local_public_key
|
||||
}
|
||||
|
||||
/// Returns all the IDs of the peer we have knowledge of.
|
||||
/// Returns the ID of a random peer of the network.
|
||||
///
|
||||
/// Returns `None` if we don't know any peer.
|
||||
pub fn random_peer(&self) -> Option<PeerstorePeerId> {
|
||||
// TODO: optimize by putting the operation directly in the peerstore
|
||||
// https://github.com/libp2p/rust-libp2p/issues/316
|
||||
let peers = match self.peerstore {
|
||||
PeersStorage::Memory(ref mem) =>
|
||||
mem.peers().collect::<Vec<_>>(),
|
||||
PeersStorage::Json(ref json) =>
|
||||
json.peers().collect::<Vec<_>>(),
|
||||
};
|
||||
|
||||
if peers.is_empty() {
|
||||
return None
|
||||
}
|
||||
|
||||
let nth = rand::random::<usize>() % peers.len();
|
||||
Some(peers[nth].clone())
|
||||
}
|
||||
|
||||
/// Returns all the IDs of the peers on the network we have knowledge of.
|
||||
///
|
||||
/// This includes peers we are not connected to.
|
||||
pub fn known_peers(&self) -> impl Iterator<Item = PeerstorePeerId> {
|
||||
@@ -402,11 +423,32 @@ impl NetworkState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if we should open a new outgoing connection to a peer.
|
||||
/// This takes into account the number of active peers.
|
||||
pub fn should_open_outgoing_connections(&self) -> bool {
|
||||
!self.reserved_only.load(atomic::Ordering::Relaxed) &&
|
||||
self.connections.read().peer_by_nodeid.len() < self.min_peers as usize
|
||||
/// Returns the number of open and pending connections with
|
||||
/// custom protocols.
|
||||
pub fn num_open_custom_connections(&self) -> u32 {
|
||||
self.connections
|
||||
.read()
|
||||
.info_by_peer
|
||||
.values()
|
||||
.filter(|info|
|
||||
info.protocols.iter().any(|&(_, ref connec)|
|
||||
match connec.state() {
|
||||
UniqueConnecState::Pending | UniqueConnecState::Full => true,
|
||||
_ => false
|
||||
}
|
||||
)
|
||||
)
|
||||
.count() as u32
|
||||
}
|
||||
|
||||
/// Returns the number of new outgoing custom connections to peers to
|
||||
/// open. This takes into account the number of active peers.
|
||||
pub fn should_open_outgoing_custom_connections(&self) -> u32 {
|
||||
if self.reserved_only.load(atomic::Ordering::Relaxed) {
|
||||
0
|
||||
} else {
|
||||
self.min_peers.saturating_sub(self.num_open_custom_connections())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if we are connected to the given node.
|
||||
|
||||
@@ -29,6 +29,7 @@ use libp2p::core::{upgrade, Transport, MuxedTransport, ConnectionUpgrade};
|
||||
use libp2p::core::{Endpoint, PeerId as PeerstorePeerId, PublicKey};
|
||||
use libp2p::core::SwarmController;
|
||||
use libp2p::ping;
|
||||
use libp2p::transport_timeout::TransportTimeout;
|
||||
use {PacketId, SessionInfo, ConnectionFilter, TimerToken};
|
||||
use rand;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
@@ -115,7 +116,7 @@ impl NetworkService {
|
||||
let kad_system = KadSystem::without_init(KadSystemConfig {
|
||||
parallelism: 3,
|
||||
local_peer_id: local_peer_id.clone(),
|
||||
kbuckets_timeout: Duration::from_secs(10),
|
||||
kbuckets_timeout: Duration::from_secs(600),
|
||||
request_timeout: Duration::from_secs(10),
|
||||
known_initial_peers: network_state.known_peers().collect(),
|
||||
});
|
||||
@@ -407,8 +408,10 @@ fn init_thread(
|
||||
let shared = shared.clone();
|
||||
move |peer_id| {
|
||||
let addrs = shared.network_state.addrs_of_peer(&peer_id);
|
||||
trace!(target: "sub-libp2p", "Peer store: loaded {} addresses \
|
||||
for {:?}", addrs.len(), peer_id);
|
||||
for addr in &addrs {
|
||||
trace!(target: "sub-libp2p", "{:?} resolved as {}",
|
||||
peer_id, addr);
|
||||
}
|
||||
addrs.into_iter()
|
||||
}
|
||||
};
|
||||
@@ -477,6 +480,29 @@ fn init_thread(
|
||||
}
|
||||
}
|
||||
|
||||
// Explicitely connect to the boostrap nodes as a temporary measure.
|
||||
for bootnode in shared.config.boot_nodes.iter() {
|
||||
// TODO: this code is copy-pasted from `network_state`, but it is
|
||||
// temporary anyway
|
||||
let mut addr: Multiaddr = bootnode.parse()
|
||||
.map_err(|_| ErrorKind::AddressParse)?;
|
||||
let p2p_component = addr.pop().ok_or(ErrorKind::AddressParse)?;
|
||||
let peer_id = match p2p_component {
|
||||
AddrComponent::P2P(key) | AddrComponent::IPFS(key) =>
|
||||
PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?,
|
||||
_ => return Err(ErrorKind::BadProtocol.into()),
|
||||
};
|
||||
|
||||
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
|
||||
for proto in shared.protocols.read().0.clone().into_iter() {
|
||||
open_peer_custom_proto(shared.clone(), transport.clone(),
|
||||
proto, peer_id.clone(), &swarm_controller)
|
||||
}
|
||||
}
|
||||
|
||||
// Start connecting to nodes now.
|
||||
connect_to_nodes(shared.clone(), transport.clone(), &swarm_controller);
|
||||
|
||||
// Build the timeouts system for the `register_timeout` function.
|
||||
// (note: this has nothing to do with socket timeouts)
|
||||
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
|
||||
@@ -609,10 +635,11 @@ fn handle_kademlia_connection(
|
||||
|
||||
/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
|
||||
/// this function builds the response to send back.
|
||||
fn build_kademlia_response(shared: &Arc<Shared>, searched: &PeerstorePeerId)
|
||||
-> Vec<KadPeer> {
|
||||
fn build_kademlia_response(
|
||||
shared: &Arc<Shared>,
|
||||
searched: &PeerstorePeerId
|
||||
) -> Vec<KadPeer> {
|
||||
shared.kad_system
|
||||
// TODO the iter of `known_closest_peers` should be infinite
|
||||
.known_closest_peers(searched)
|
||||
.map(move |peer_id| {
|
||||
if peer_id == *shared.kad_system.local_peer_id() {
|
||||
@@ -638,6 +665,13 @@ fn build_kademlia_response(shared: &Arc<Shared>, searched: &PeerstorePeerId)
|
||||
}
|
||||
}
|
||||
})
|
||||
// TODO: we really want to remove nodes with no multiaddress from
|
||||
// the results, but a flaw in the Kad protocol of libp2p makes it
|
||||
// impossible to return empty results ; therefore we must at least
|
||||
// return ourselves
|
||||
.filter(|p| p.node_id == *shared.kad_system.local_peer_id() ||
|
||||
!p.multiaddrs.is_empty())
|
||||
.take(20)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
@@ -769,8 +803,6 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
To: AsyncRead + AsyncWrite + 'static,
|
||||
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
|
||||
C: 'static {
|
||||
let local_peer_id = shared.network_state.local_public_key().clone().into_peer_id();
|
||||
|
||||
let kad_init = shared.kad_system.perform_initialization({
|
||||
let shared = shared.clone();
|
||||
let transport = transport.clone();
|
||||
@@ -796,7 +828,7 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
// that we don't need to run a timer just for flushing.
|
||||
let _ = shared.network_state.flush_caches_to_disk();
|
||||
|
||||
if shared.network_state.should_open_outgoing_connections() {
|
||||
if shared.network_state.should_open_outgoing_custom_connections() != 0 {
|
||||
future::Either::A(perform_kademlia_query(shared.clone(),
|
||||
transport.clone(), swarm_controller.clone()))
|
||||
} else {
|
||||
@@ -804,15 +836,15 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
// `min_peers`), pretend we did a lookup but with an empty
|
||||
// result.
|
||||
trace!(target: "sub-libp2p", "Bypassing kademlia discovery");
|
||||
future::Either::B(future::ok(Vec::new()))
|
||||
future::Either::B(future::ok(()))
|
||||
}
|
||||
}
|
||||
})
|
||||
.for_each({
|
||||
let shared = shared.clone();
|
||||
move |results| {
|
||||
process_kad_results(shared.clone(), transport.clone(),
|
||||
swarm_controller.clone(), results, &local_peer_id);
|
||||
move |_| {
|
||||
connect_to_nodes(shared.clone(), transport.clone(),
|
||||
&swarm_controller);
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
@@ -826,10 +858,14 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
Box::new(final_future) as Box<Future<Item = _, Error = _>>
|
||||
}
|
||||
|
||||
/// Performs a kademlia request to a random node, and returns the results.
|
||||
fn perform_kademlia_query<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
swarm_controller: SwarmController<St>)
|
||||
-> impl Future<Item = Vec<PeerstorePeerId>, Error = IoError>
|
||||
/// Performs a kademlia request to a random node.
|
||||
/// Note that we don't actually care about the results, so the future
|
||||
/// produces `()`.
|
||||
fn perform_kademlia_query<T, To, St, C>(
|
||||
shared: Arc<Shared>,
|
||||
transport: T,
|
||||
swarm_controller: SwarmController<St>
|
||||
) -> impl Future<Item = (), Error = IoError>
|
||||
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
|
||||
T::MultiaddrFuture: 'static,
|
||||
To: AsyncRead + AsyncWrite + 'static,
|
||||
@@ -865,52 +901,47 @@ fn perform_kademlia_query<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
}
|
||||
None
|
||||
},
|
||||
KadQueryEvent::Finished(out) => Some(out),
|
||||
KadQueryEvent::Finished(_) => Some(()),
|
||||
}
|
||||
)
|
||||
.into_future()
|
||||
.map_err(|(err, _)| err)
|
||||
.map(|(out, _)| out.unwrap())
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Processes the results of a Kademlia discovery.
|
||||
fn process_kad_results<T, To, St, C>(shared: Arc<Shared>, transport: T,
|
||||
swarm_controller: SwarmController<St>, results: Vec<PeerstorePeerId>,
|
||||
local_peer_id: &PeerstorePeerId)
|
||||
/// Connects to additional nodes, if necessary.
|
||||
fn connect_to_nodes<T, To, St, C>(
|
||||
shared: Arc<Shared>,
|
||||
base_transport: T,
|
||||
swarm_controller: &SwarmController<St>
|
||||
)
|
||||
where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static,
|
||||
T::MultiaddrFuture: 'static,
|
||||
To: AsyncRead + AsyncWrite + 'static,
|
||||
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
|
||||
C: 'static {
|
||||
trace!(target: "sub-libp2p", "Processing Kademlia discovery results \
|
||||
(len = {})", results.len());
|
||||
let num_slots = shared.network_state.should_open_outgoing_custom_connections();
|
||||
debug!(target: "sub-libp2p", "Opening up to {} outgoing connections",
|
||||
num_slots);
|
||||
|
||||
for discovered_peer in results {
|
||||
// Skip if we reach `min_peers`.
|
||||
// Also skip nodes we are already connected to, in order to not connect twice.
|
||||
// TODO: better API in network_state
|
||||
if !shared.network_state.should_open_outgoing_connections() ||
|
||||
discovered_peer == *local_peer_id ||
|
||||
shared.network_state.is_peer_disabled(&discovered_peer)
|
||||
{
|
||||
trace!(target: "sub-libp2p", "Skipping discovered node {:?}", discovered_peer);
|
||||
continue
|
||||
}
|
||||
for _ in 0 .. num_slots {
|
||||
// Choose a random peer. We are potentially already connected to
|
||||
// this peer, but this is not a problem as this function is called
|
||||
// regularly.
|
||||
// TODO: is it ^ ?
|
||||
let peer = match shared.network_state.random_peer() {
|
||||
Some(p) => p,
|
||||
// `None` returned when no peer is known
|
||||
None => break,
|
||||
};
|
||||
|
||||
// Try to dial that node for each registered protocol. Since dialing
|
||||
// upgrades the connection to use multiplexing, dialing multiple times
|
||||
// should automatically open multiple substreams.
|
||||
let addr: Multiaddr = AddrComponent::P2P(discovered_peer.clone().into_bytes()).into();
|
||||
trace!(target: "sub-libp2p", "Dialing discovered node {:?} for each protocol", addr);
|
||||
trace!(target: "sub-libp2p", "Ensuring connection to {:?}", peer);
|
||||
for proto in shared.protocols.read().0.clone().into_iter() {
|
||||
open_peer_custom_proto(
|
||||
shared.clone(),
|
||||
transport.clone(),
|
||||
proto,
|
||||
addr.clone(),
|
||||
discovered_peer.clone(),
|
||||
&swarm_controller
|
||||
);
|
||||
open_peer_custom_proto(shared.clone(), base_transport.clone(),
|
||||
proto, peer.clone(), swarm_controller)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -923,7 +954,6 @@ fn open_peer_custom_proto<T, To, St, C>(
|
||||
shared: Arc<Shared>,
|
||||
base_transport: T,
|
||||
proto: RegisteredProtocol<Arc<NetworkProtocolHandler + Send + Sync>>,
|
||||
addr: Multiaddr,
|
||||
expected_peer_id: PeerstorePeerId,
|
||||
swarm_controller: &SwarmController<St>
|
||||
)
|
||||
@@ -933,9 +963,20 @@ fn open_peer_custom_proto<T, To, St, C>(
|
||||
St: MuxedTransport<Output = (FinalUpgrade<C>, Endpoint)> + Clone + 'static,
|
||||
C: 'static,
|
||||
{
|
||||
// Don't connect to ourselves.
|
||||
if &expected_peer_id == shared.kad_system.local_peer_id() {
|
||||
return
|
||||
}
|
||||
|
||||
// Don't connect to a disabled peer.
|
||||
if shared.network_state.is_peer_disabled(&expected_peer_id) {
|
||||
return
|
||||
}
|
||||
|
||||
let proto_id = proto.id();
|
||||
let peer_id = expected_peer_id.clone();
|
||||
let shared2 = shared.clone();
|
||||
let addr: Multiaddr = AddrComponent::P2P(expected_peer_id.clone().into_bytes()).into();
|
||||
|
||||
// TODO: check that the secio key matches the id given by kademlia
|
||||
let with_proto = base_transport
|
||||
@@ -964,10 +1005,22 @@ fn open_peer_custom_proto<T, To, St, C>(
|
||||
.and_then(move |out, endpoint, client_addr|
|
||||
future::ok(((FinalUpgrade::Custom(out), endpoint), client_addr))
|
||||
);
|
||||
|
||||
let with_timeout = TransportTimeout::new(with_proto,
|
||||
Duration::from_secs(20));
|
||||
let with_err = with_timeout
|
||||
.map_err({
|
||||
let peer_id = peer_id.clone();
|
||||
move |err| {
|
||||
debug!(target: "sub-libp2p", "Error while dialing \
|
||||
{:?} with custom proto: {:?}", peer_id, err);
|
||||
err
|
||||
}
|
||||
});
|
||||
|
||||
if let Ok(unique_connec) = shared2.network_state
|
||||
.custom_proto(peer_id, proto_id, Endpoint::Dialer) {
|
||||
let _ = unique_connec.1.get_or_dial(&swarm_controller, &addr, with_proto);
|
||||
let _ = unique_connec.1.get_or_dial(&swarm_controller, &addr, with_err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
use libp2p::{self, Transport, secio};
|
||||
use libp2p::core::{MuxedTransport, either, upgrade};
|
||||
use libp2p::transport_timeout::TransportTimeout;
|
||||
use std::time::Duration;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
@@ -25,7 +27,7 @@ pub fn build_transport(
|
||||
unencrypted_allowed: UnencryptedAllowed,
|
||||
local_private_key: secio::SecioKeyPair
|
||||
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
|
||||
libp2p::CommonTransport::new(core)
|
||||
let base = libp2p::CommonTransport::new(core)
|
||||
.with_upgrade({
|
||||
let secio = secio::SecioConfig {
|
||||
key: local_private_key,
|
||||
@@ -51,7 +53,14 @@ pub fn build_transport(
|
||||
// TODO: check that the public key matches what is reported by identify
|
||||
.map(|(socket, _key), _| socket)
|
||||
.with_upgrade(libp2p::mplex::MultiplexConfig::new())
|
||||
.into_connection_reuse()
|
||||
.into_connection_reuse();
|
||||
|
||||
TransportTimeout::new(base, Duration::from_secs(20))
|
||||
.map_err(|err| {
|
||||
debug!(target: "sub-libp2p", "Error in base transport \
|
||||
layer: {:?}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
/// Specifies whether unencrypted communications are allowed or denied.
|
||||
|
||||
Reference in New Issue
Block a user