mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 10:01:17 +00:00
Add a substrate-peerset crate (#2042)
* Add a substrate-peerset crate * Some adjustements * More adjustements * Use a temporary libp2p branch * Add back-off mechanism * Fix RPC tests * Some adjustements * Another libp2p bugfix * Do a round-robin in the peerset * Use a real dependency instead of a patch for libp2p * Initialize reserved nodes correctly * Better diagnostic for no address * Don't allocate slots if in reserved only * Ban node on dial failure * Fix indentation
This commit is contained in:
committed by
Robert Habermeier
parent
f6f15b618e
commit
90c6f85db5
@@ -15,14 +15,13 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol};
|
||||
use crate::NetworkConfiguration;
|
||||
use futures::prelude::*;
|
||||
use libp2p::NetworkBehaviour;
|
||||
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
|
||||
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
|
||||
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
|
||||
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
|
||||
use libp2p::kad::{Kademlia, KademliaOut, KadConnectionType};
|
||||
use libp2p::kad::{Kademlia, KademliaOut};
|
||||
use libp2p::ping::{Ping, PingEvent};
|
||||
use log::{debug, trace, warn};
|
||||
use std::{cmp, io, fmt, time::Duration, time::Instant};
|
||||
@@ -50,21 +49,35 @@ pub struct Behaviour<TMessage, TSubstream> {
|
||||
|
||||
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
/// Builds a new `Behaviour`.
|
||||
// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
|
||||
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocol<TMessage>) -> Self {
|
||||
pub fn new(
|
||||
user_agent: String,
|
||||
local_public_key: PublicKey,
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
known_addresses: Vec<(PeerId, Multiaddr)>,
|
||||
peerset: substrate_peerset::PeersetMut,
|
||||
) -> Self {
|
||||
let identify = {
|
||||
let proto_version = "/substrate/1.0".to_string();
|
||||
let user_agent = format!("{} ({})", config.client_version, config.node_name);
|
||||
Identify::new(proto_version, user_agent, local_public_key.clone())
|
||||
};
|
||||
|
||||
let local_peer_id = local_public_key.into_peer_id();
|
||||
let custom_protocols = CustomProto::new(config, &local_peer_id, protocols);
|
||||
let custom_protocols = CustomProto::new(protocol, peerset);
|
||||
|
||||
let mut kademlia = Kademlia::without_init(local_public_key.into_peer_id());
|
||||
for (peer_id, addr) in &known_addresses {
|
||||
kademlia.add_connected_address(peer_id, addr.clone());
|
||||
}
|
||||
kademlia.initialize();
|
||||
|
||||
Behaviour {
|
||||
ping: Ping::new(),
|
||||
custom_protocols,
|
||||
discovery: DiscoveryBehaviour::new(local_peer_id),
|
||||
discovery: DiscoveryBehaviour {
|
||||
user_defined: known_addresses,
|
||||
kademlia,
|
||||
next_kad_random_query: Delay::new(Instant::now()),
|
||||
duration_to_next_kad: Duration::from_secs(60),
|
||||
},
|
||||
identify,
|
||||
events: Vec::new(),
|
||||
}
|
||||
@@ -81,67 +94,9 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
self.custom_protocols.send_packet(target, data)
|
||||
}
|
||||
|
||||
/// Returns the number of peers in the topology.
|
||||
pub fn num_topology_peers(&self) -> usize {
|
||||
self.custom_protocols.num_topology_peers()
|
||||
}
|
||||
|
||||
/// Flushes the topology to the disk.
|
||||
pub fn flush_topology(&mut self) -> Result<(), io::Error> {
|
||||
self.custom_protocols.flush_topology()
|
||||
}
|
||||
|
||||
/// Perform a cleanup pass, removing all obsolete addresses and peers.
|
||||
///
|
||||
/// This should be done from time to time.
|
||||
pub fn cleanup(&mut self) {
|
||||
self.custom_protocols.cleanup();
|
||||
}
|
||||
|
||||
/// Returns the list of reserved nodes.
|
||||
pub fn reserved_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.custom_protocols.reserved_peers()
|
||||
}
|
||||
|
||||
/// Try to add a reserved peer.
|
||||
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
self.custom_protocols.add_reserved_peer(peer_id, addr)
|
||||
}
|
||||
|
||||
/// Try to remove a reserved peer.
|
||||
///
|
||||
/// If we are in reserved mode and we were connected to a node with this peer ID, then this
|
||||
/// method will disconnect it and return its index.
|
||||
pub fn remove_reserved_peer(&mut self, peer_id: PeerId) {
|
||||
self.custom_protocols.remove_reserved_peer(peer_id)
|
||||
}
|
||||
|
||||
/// Returns true if we only accept reserved nodes.
|
||||
pub fn is_reserved_only(&self) -> bool {
|
||||
self.custom_protocols.is_reserved_only()
|
||||
}
|
||||
|
||||
/// Start accepting all peers again if we weren't.
|
||||
pub fn accept_unreserved_peers(&mut self) {
|
||||
self.custom_protocols.accept_unreserved_peers()
|
||||
}
|
||||
|
||||
/// Start refusing non-reserved nodes. Returns the list of nodes that have been disconnected.
|
||||
pub fn deny_unreserved_peers(&mut self) {
|
||||
self.custom_protocols.deny_unreserved_peers()
|
||||
}
|
||||
|
||||
/// Disconnects a peer and bans it for a little while.
|
||||
///
|
||||
/// Same as `drop_node`, except that the same peer will not be able to reconnect later.
|
||||
#[inline]
|
||||
pub fn ban_node(&mut self, peer_id: PeerId) {
|
||||
self.custom_protocols.ban_peer(peer_id)
|
||||
}
|
||||
|
||||
/// Returns a list of all the peers that are banned, and until when.
|
||||
pub fn banned_nodes(&self) -> impl Iterator<Item = (&PeerId, Instant)> {
|
||||
self.custom_protocols.banned_peers()
|
||||
/// Returns the list of nodes that we know exist in the network.
|
||||
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.discovery.kademlia.kbuckets_entries()
|
||||
}
|
||||
|
||||
/// Returns true if we try to open protocols with the given peer.
|
||||
@@ -154,6 +109,13 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
self.custom_protocols.is_open(peer_id)
|
||||
}
|
||||
|
||||
/// Adds a hard-coded address for the given peer, that never expires.
|
||||
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
|
||||
self.discovery.user_defined.push((peer_id, addr));
|
||||
}
|
||||
}
|
||||
|
||||
/// Disconnects the custom protocols from a peer.
|
||||
///
|
||||
/// The peer will still be able to use Kademlia or other protocols, but will get disconnected
|
||||
@@ -167,16 +129,6 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
pub fn drop_node(&mut self, peer_id: &PeerId) {
|
||||
self.custom_protocols.disconnect_peer(peer_id)
|
||||
}
|
||||
|
||||
/// Returns the list of peers in the topology.
|
||||
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.custom_protocols.known_peers()
|
||||
}
|
||||
|
||||
/// Returns a list of addresses known for this peer, and their reputation score.
|
||||
pub fn known_addresses(&mut self, peer_id: &PeerId) -> impl Iterator<Item = (&Multiaddr, u32)> {
|
||||
self.custom_protocols.known_addresses(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Event that can be emitted by the behaviour.
|
||||
@@ -283,10 +235,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
|
||||
for addr in &info.listen_addrs {
|
||||
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
|
||||
}
|
||||
self.custom_protocols.add_discovered_addrs(
|
||||
&peer_id,
|
||||
info.listen_addrs.iter().map(|addr| (addr.clone(), true))
|
||||
);
|
||||
self.custom_protocols.add_discovered_node(&peer_id);
|
||||
self.events.push(BehaviourOut::Identified { peer_id, info });
|
||||
}
|
||||
IdentifyEvent::Error { .. } => {}
|
||||
@@ -301,17 +250,16 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TMessage, TSubstream> {
|
||||
fn inject_event(&mut self, out: KademliaOut) {
|
||||
match out {
|
||||
KademliaOut::Discovered { peer_id, addresses, ty } => {
|
||||
self.custom_protocols.add_discovered_addrs(
|
||||
&peer_id,
|
||||
addresses.into_iter().map(|addr| (addr, ty == KadConnectionType::Connected))
|
||||
);
|
||||
KademliaOut::Discovered { .. } => {}
|
||||
KademliaOut::KBucketAdded { peer_id, .. } => {
|
||||
self.custom_protocols.add_discovered_node(&peer_id);
|
||||
}
|
||||
KademliaOut::FindNodeResult { key, closer_peers } => {
|
||||
trace!(target: "sub-libp2p", "Kademlia query for {:?} yielded {:?} results",
|
||||
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
|
||||
key, closer_peers.len());
|
||||
if closer_peers.is_empty() {
|
||||
warn!(target: "sub-libp2p", "Kademlia random query has yielded empty results");
|
||||
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||
results");
|
||||
}
|
||||
}
|
||||
// We never start any GET_PROVIDERS query.
|
||||
@@ -343,6 +291,9 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
|
||||
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
|
||||
pub struct DiscoveryBehaviour<TSubstream> {
|
||||
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
|
||||
/// reserved nodes.
|
||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||
/// Kademlia requests and answers.
|
||||
kademlia: Kademlia<TSubstream>,
|
||||
/// Stream that fires when we need to perform the next random Kademlia query.
|
||||
@@ -351,16 +302,6 @@ pub struct DiscoveryBehaviour<TSubstream> {
|
||||
duration_to_next_kad: Duration,
|
||||
}
|
||||
|
||||
impl<TSubstream> DiscoveryBehaviour<TSubstream> {
|
||||
fn new(local_peer_id: PeerId) -> Self {
|
||||
DiscoveryBehaviour {
|
||||
kademlia: Kademlia::without_init(local_peer_id),
|
||||
next_kad_random_query: Delay::new(Instant::now()),
|
||||
duration_to_next_kad: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
@@ -373,7 +314,21 @@ where
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
self.kademlia.addresses_of_peer(peer_id)
|
||||
let mut list = self.user_defined.iter()
|
||||
.filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None })
|
||||
.collect::<Vec<_>>();
|
||||
list.extend(self.kademlia.addresses_of_peer(peer_id));
|
||||
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
|
||||
if list.is_empty() {
|
||||
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) {
|
||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \
|
||||
and no address was found", peer_id);
|
||||
} else {
|
||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \
|
||||
and no address was found", peer_id);
|
||||
}
|
||||
}
|
||||
list
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
|
||||
@@ -417,8 +372,8 @@ where
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(_)) => {
|
||||
let random_peer_id = PeerId::random();
|
||||
debug!(target: "sub-libp2p", "Starting random Kademlia request for {:?}",
|
||||
random_peer_id);
|
||||
debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \
|
||||
{:?}", random_peer_id);
|
||||
self.kademlia.find_node(random_peer_id);
|
||||
|
||||
// Reset the `Delay` to the next random.
|
||||
@@ -427,7 +382,7 @@ where
|
||||
Duration::from_secs(60));
|
||||
},
|
||||
Err(err) => {
|
||||
warn!(target: "sub-libp2p", "Kad query timer errored: {:?}", err);
|
||||
warn!(target: "sub-libp2p", "Kademlia query timer errored: {:?}", err);
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -799,10 +799,6 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
is_severe,
|
||||
error: Box::new(err),
|
||||
}));
|
||||
|
||||
// If we failed to open a substream, there is little chance that we manage to open any
|
||||
// other substream ever again on this connection, and thus we disable the handler.
|
||||
self.disable();
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
|
||||
@@ -19,5 +19,4 @@ pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
|
||||
|
||||
mod behaviour;
|
||||
mod handler;
|
||||
mod topology;
|
||||
mod upgrade;
|
||||
|
||||
@@ -1,725 +0,0 @@
|
||||
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use libp2p::{core::swarm::ConnectedPoint, Multiaddr, PeerId};
|
||||
use log::{debug, info, trace, warn};
|
||||
use serde_derive::{Serialize, Deserialize};
|
||||
use std::{cmp, fs};
|
||||
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write, BufReader, BufWriter};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
/// For each address we're connected to, a period of this duration increases the score by 1.
|
||||
const CONNEC_DURATION_PER_SCORE: Duration = Duration::from_secs(10);
|
||||
/// Maximum number of addresses for a given peer. If there are more than this number of addresses,
|
||||
/// the ones with a lower score will be removed.
|
||||
const MAX_ADDRESSES_PER_PEER: usize = 10;
|
||||
/// Maximum value for the score.
|
||||
const MAX_SCORE: u32 = 100;
|
||||
/// When we successfully connect to a node, raises its score to the given minimum value.
|
||||
const CONNECTED_MINIMUM_SCORE: u32 = 20;
|
||||
/// Initial score that a node discovered through Kademlia receives, where we have a hint that the
|
||||
/// node is reachable.
|
||||
const DISCOVERY_INITIAL_SCORE_CONNECTABLE: u32 = 15;
|
||||
/// Initial score that a node discovered through Kademlia receives, without any hint.
|
||||
const DISCOVERY_INITIAL_SCORE: u32 = 10;
|
||||
/// Score adjustement when we fail to connect to an address.
|
||||
const SCORE_DIFF_ON_FAILED_TO_CONNECT: i32 = -1;
|
||||
/// Default time-to-live for addresses discovered through Kademlia.
|
||||
/// After this time has elapsed and no connection has succeeded, the address will be removed.
|
||||
const KADEMLIA_DISCOVERY_EXPIRATION: Duration = Duration::from_secs(2 * 3600);
|
||||
/// After a successful connection, the TTL is set to a minimum at this amount.
|
||||
const EXPIRATION_PUSH_BACK_CONNEC: Duration = Duration::from_secs(2 * 3600);
|
||||
/// Initial score that a bootstrap node receives when registered.
|
||||
const BOOTSTRAP_NODE_SCORE: u32 = 100;
|
||||
/// Time to live of a boostrap node. This only applies if you start the node later *without*
|
||||
/// that bootstrap node configured anymore.
|
||||
const BOOTSTRAP_NODE_EXPIRATION: Duration = Duration::from_secs(24 * 3600);
|
||||
/// The first time we fail to connect to an address, wait this duration before trying again.
|
||||
const FIRST_CONNECT_FAIL_BACKOFF: Duration = Duration::from_secs(2);
|
||||
/// Every time we fail to connect to an address, multiply the backoff by this constant.
|
||||
const FAIL_BACKOFF_MULTIPLIER: u32 = 2;
|
||||
/// We need a maximum value for the backoff, overwise we risk an overflow.
|
||||
const MAX_BACKOFF: Duration = Duration::from_secs(30 * 60);
|
||||
|
||||
/// Stores information about the topology of the network.
|
||||
#[derive(Debug)]
|
||||
pub struct NetTopology {
|
||||
/// The actual storage. Never contains a key for `local_peer_id`.
|
||||
store: FnvHashMap<PeerId, PeerInfo>,
|
||||
/// Optional path to the file that caches the serialized version of `store`.
|
||||
cache_path: Option<PathBuf>,
|
||||
/// PeerId of the local node.
|
||||
local_peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl NetTopology {
|
||||
/// Initializes a new `NetTopology` that isn't tied to any file.
|
||||
///
|
||||
/// `flush_to_disk()` will be a no-op.
|
||||
#[inline]
|
||||
pub fn memory(local_peer_id: PeerId) -> NetTopology {
|
||||
NetTopology {
|
||||
store: Default::default(),
|
||||
cache_path: None,
|
||||
local_peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a `NetTopology` that will use `path` as a cache.
|
||||
///
|
||||
/// This function tries to load a known topology from the file. If the file doesn't exist
|
||||
/// or contains garbage data, the execution still continues.
|
||||
///
|
||||
/// Calling `flush_to_disk()` in the future writes to the given path.
|
||||
pub fn from_file<P: AsRef<Path>>(local_peer_id: PeerId, path: P) -> NetTopology {
|
||||
let path = path.as_ref();
|
||||
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
|
||||
let store = try_load(path, &local_peer_id);
|
||||
NetTopology {
|
||||
store,
|
||||
cache_path: Some(path.to_owned()),
|
||||
local_peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes the topology into the path passed to `from_file`.
|
||||
///
|
||||
/// No-op if the object was created with `memory()`.
|
||||
pub fn flush_to_disk(&mut self) -> Result<(), IoError> {
|
||||
let path = match self.cache_path {
|
||||
Some(ref p) => p,
|
||||
None => return Ok(())
|
||||
};
|
||||
|
||||
let file = fs::File::create(path)?;
|
||||
// TODO: the capacity of the BufWriter is kind of arbitrary ; decide better
|
||||
serialize(BufWriter::with_capacity(1024 * 1024, file), &mut self.store)
|
||||
}
|
||||
|
||||
/// Returns the number of peers in the topology, excluding the local peer.
|
||||
#[inline]
|
||||
pub fn num_peers(&self) -> usize {
|
||||
self.store.len()
|
||||
}
|
||||
|
||||
/// Perform a cleanup pass, removing all obsolete addresses and peers.
|
||||
///
|
||||
/// This should be done from time to time.
|
||||
pub fn cleanup(&mut self) {
|
||||
let now_systime = SystemTime::now();
|
||||
self.store.retain(|_, peer| {
|
||||
let new_addrs = peer.addrs
|
||||
.drain(..)
|
||||
.filter(|a| a.expires > now_systime || a.is_connected())
|
||||
.collect();
|
||||
peer.addrs = new_addrs;
|
||||
!peer.addrs.is_empty()
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns a list of all the known addresses of peers, ordered by the
|
||||
/// order in which we should attempt to connect to them.
|
||||
///
|
||||
/// Because of expiration and back-off mechanisms, this list can grow
|
||||
/// by itself over time. The `Instant` that is returned corresponds to
|
||||
/// the earlier known time when a new entry will be added automatically to
|
||||
/// the list.
|
||||
pub fn addrs_to_attempt(&mut self) -> (impl Iterator<Item = (&PeerId, &Multiaddr)>, Instant) {
|
||||
// TODO: optimize
|
||||
let now = Instant::now();
|
||||
let now_systime = SystemTime::now();
|
||||
let mut instant = now + Duration::from_secs(3600);
|
||||
let mut addrs_out = Vec::new();
|
||||
|
||||
let mut peer_addrs = Vec::new();
|
||||
|
||||
'peer_loop: for (peer, info) in &mut self.store {
|
||||
peer_addrs.clear();
|
||||
|
||||
for addr in &mut info.addrs {
|
||||
let (score, is_connected) = addr.score_and_is_connected();
|
||||
if is_connected {
|
||||
continue 'peer_loop
|
||||
}
|
||||
if score == 0 || addr.expires < now_systime {
|
||||
continue
|
||||
}
|
||||
if addr.back_off_until > now {
|
||||
instant = cmp::min(instant, addr.back_off_until);
|
||||
continue
|
||||
}
|
||||
|
||||
peer_addrs.push(((peer, &addr.addr), score));
|
||||
}
|
||||
|
||||
for val in peer_addrs.drain(..) {
|
||||
addrs_out.push(val);
|
||||
}
|
||||
}
|
||||
|
||||
addrs_out.sort_by(|a, b| b.1.cmp(&a.1));
|
||||
(addrs_out.into_iter().map(|a| a.0), instant)
|
||||
}
|
||||
|
||||
/// Adds an address corresponding to a boostrap node.
|
||||
///
|
||||
/// We assume that the address is valid, so its score starts very high.
|
||||
pub fn add_bootstrap_addr(&mut self, peer: &PeerId, addr: Multiaddr) {
|
||||
if *peer == self.local_peer_id {
|
||||
return
|
||||
}
|
||||
|
||||
let now_systime = SystemTime::now();
|
||||
let now = Instant::now();
|
||||
|
||||
let peer = peer_access(&mut self.store, peer);
|
||||
|
||||
let mut found = false;
|
||||
let new_addrs = peer.addrs
|
||||
.drain(..)
|
||||
.filter_map(|a| {
|
||||
if a.expires < now_systime && !a.is_connected() {
|
||||
return None
|
||||
}
|
||||
if a.addr == addr {
|
||||
found = true;
|
||||
}
|
||||
Some(a)
|
||||
})
|
||||
.collect();
|
||||
peer.addrs = new_addrs;
|
||||
|
||||
if !found {
|
||||
peer.addrs.push(Addr {
|
||||
addr,
|
||||
expires: now_systime + BOOTSTRAP_NODE_EXPIRATION,
|
||||
back_off_until: now,
|
||||
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
|
||||
score: AddrScore {
|
||||
connected_since: None,
|
||||
score: BOOTSTRAP_NODE_SCORE,
|
||||
latest_score_update: now,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicates the topology that we have discovered new addresses for a given node.
|
||||
///
|
||||
/// Returns `true` if the topology has changed in some way. Returns `false` if calling this
|
||||
/// method was a no-op.
|
||||
pub fn add_discovered_addrs<I>(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
addrs: I,
|
||||
) -> bool
|
||||
where I: Iterator<Item = (Multiaddr, bool)> {
|
||||
if *peer_id == self.local_peer_id {
|
||||
return false
|
||||
}
|
||||
|
||||
let mut addrs: Vec<_> = addrs.collect();
|
||||
|
||||
if addrs.len() > 40 {
|
||||
warn!(target: "sub-libp2p", "Attempt to add more than 40 addresses for {:?}", peer_id);
|
||||
addrs.truncate(40);
|
||||
}
|
||||
|
||||
let now_systime = SystemTime::now();
|
||||
let now = Instant::now();
|
||||
|
||||
let peer = peer_access(&mut self.store, peer_id);
|
||||
|
||||
let new_addrs = peer.addrs
|
||||
.drain(..)
|
||||
.filter(|a| {
|
||||
if a.expires < now_systime && !a.is_connected() {
|
||||
return false
|
||||
}
|
||||
addrs.retain(|(addr, _)| *addr != a.addr);
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
peer.addrs = new_addrs;
|
||||
|
||||
let mut anything_changed = false;
|
||||
|
||||
if !addrs.is_empty() {
|
||||
anything_changed = true;
|
||||
trace!(
|
||||
target: "sub-libp2p",
|
||||
"Peer store: adding addresses {:?} for {:?}",
|
||||
addrs,
|
||||
peer_id,
|
||||
);
|
||||
}
|
||||
|
||||
'addrs_inserter: for (addr, connectable) in addrs {
|
||||
let initial_score = if connectable {
|
||||
DISCOVERY_INITIAL_SCORE_CONNECTABLE
|
||||
} else {
|
||||
DISCOVERY_INITIAL_SCORE
|
||||
};
|
||||
|
||||
// Enforce `MAX_ADDRESSES_PER_PEER` before inserting, or skip this entry.
|
||||
while peer.addrs.len() >= MAX_ADDRESSES_PER_PEER {
|
||||
let pos = peer.addrs.iter_mut().position(|addr| addr.score() <= initial_score);
|
||||
if let Some(pos) = pos {
|
||||
let _ = peer.addrs.remove(pos);
|
||||
} else {
|
||||
continue 'addrs_inserter;
|
||||
}
|
||||
}
|
||||
|
||||
// `addrs` can contain duplicates, therefore we would insert the same address twice.
|
||||
if peer.addrs.iter().any(|a| a.addr == addr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
peer.addrs.push(Addr {
|
||||
addr,
|
||||
expires: now_systime + KADEMLIA_DISCOVERY_EXPIRATION,
|
||||
back_off_until: now,
|
||||
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
|
||||
score: AddrScore {
|
||||
connected_since: None,
|
||||
score: initial_score,
|
||||
latest_score_update: now,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
anything_changed
|
||||
}
|
||||
|
||||
/// Returns the list of peers that are stored in the topology.
|
||||
#[inline]
|
||||
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.store.keys()
|
||||
}
|
||||
|
||||
/// Returns the addresses stored for a specific peer, and their reputation score.
|
||||
///
|
||||
/// If `include_expired` is true, includes expired addresses that shouldn't be taken into
|
||||
/// account when dialing.
|
||||
#[inline]
|
||||
pub fn addresses_of_peer(&mut self, peer: &PeerId, include_expired: bool)
|
||||
-> impl Iterator<Item = (&Multiaddr, u32)> {
|
||||
let now_st = SystemTime::now();
|
||||
let now_is = Instant::now();
|
||||
|
||||
let mut list = self.store.get_mut(peer).into_iter().flat_map(|p| p.addrs.iter_mut()).filter_map(move |addr| {
|
||||
let (score, connected) = addr.score_and_is_connected();
|
||||
if include_expired || (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected {
|
||||
Some((score, &addr.addr))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect::<Vec<_>>();
|
||||
list.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
// TODO: meh, optimize
|
||||
list.into_iter().map(|(score, addr)| (addr, score))
|
||||
}
|
||||
|
||||
/// Marks the given peer as connected through the given endpoint.
|
||||
pub fn set_connected(&mut self, peer: &PeerId, endpoint: &ConnectedPoint) {
|
||||
if *peer == self.local_peer_id {
|
||||
return
|
||||
}
|
||||
|
||||
let addr = match endpoint {
|
||||
ConnectedPoint::Dialer { address } => address,
|
||||
ConnectedPoint::Listener { .. } => return
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// Just making sure that we have an entry for this peer in `store`, but don't use it.
|
||||
let _ = peer_access(&mut self.store, peer);
|
||||
|
||||
for (peer_in_store, info_in_store) in self.store.iter_mut() {
|
||||
if peer == peer_in_store {
|
||||
if let Some(addr) = info_in_store.addrs.iter_mut().find(|a| &a.addr == addr) {
|
||||
addr.connected_now(CONNECTED_MINIMUM_SCORE);
|
||||
addr.back_off_until = now;
|
||||
addr.next_back_off = FIRST_CONNECT_FAIL_BACKOFF;
|
||||
continue
|
||||
}
|
||||
|
||||
info_in_store.addrs.push(Addr {
|
||||
addr: addr.clone(),
|
||||
expires: SystemTime::now() + EXPIRATION_PUSH_BACK_CONNEC,
|
||||
back_off_until: now,
|
||||
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
|
||||
score: AddrScore {
|
||||
connected_since: Some(now),
|
||||
latest_score_update: now,
|
||||
score: CONNECTED_MINIMUM_SCORE,
|
||||
},
|
||||
});
|
||||
|
||||
} else {
|
||||
// Set the score to 0 for any address that matches the one we connected to.
|
||||
for addr_in_store in &mut info_in_store.addrs {
|
||||
if &addr_in_store.addr == addr {
|
||||
addr_in_store.adjust_score(-(MAX_SCORE as i32));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks the given peer as disconnected. The endpoint is the one we were connected to.
|
||||
pub fn set_disconnected(&mut self, _: &PeerId, endpoint: &ConnectedPoint) {
|
||||
let addr = match endpoint {
|
||||
ConnectedPoint::Dialer { address } => address,
|
||||
ConnectedPoint::Listener { .. } => return
|
||||
};
|
||||
|
||||
// Note that we used to have different score values here in the past, but there really
|
||||
// isn't much point in doing so in practice.
|
||||
let score_diff = -3;
|
||||
|
||||
for info in self.store.values_mut() {
|
||||
for a in info.addrs.iter_mut() {
|
||||
if &a.addr == addr {
|
||||
a.disconnected_now(score_diff);
|
||||
a.back_off_until = Instant::now() + a.next_back_off;
|
||||
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
|
||||
let expires_push_back = SystemTime::now() + EXPIRATION_PUSH_BACK_CONNEC;
|
||||
if a.expires < expires_push_back {
|
||||
a.expires = expires_push_back;
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicates to the topology that we failed to reach a node when dialing the given address.
|
||||
pub fn set_unreachable(&mut self, addr: &Multiaddr) {
|
||||
for info in self.store.values_mut() {
|
||||
for a in info.addrs.iter_mut() {
|
||||
if &a.addr != addr {
|
||||
continue
|
||||
}
|
||||
|
||||
// It is possible that we are connected to this address, and that the dial failure
|
||||
// concerns another peer.
|
||||
if a.is_connected() {
|
||||
continue
|
||||
}
|
||||
|
||||
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
|
||||
trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off);
|
||||
a.back_off_until = Instant::now() + a.next_back_off;
|
||||
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_access<'a>(store: &'a mut FnvHashMap<PeerId, PeerInfo>, peer: &PeerId) -> &'a mut PeerInfo {
|
||||
// TODO: should be optimizable if HashMap gets a better API
|
||||
store.entry(peer.clone()).or_insert_with(Default::default)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct PeerInfo {
|
||||
/// Addresses of that peer.
|
||||
addrs: Vec<Addr>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Addr {
|
||||
/// The multiaddress.
|
||||
addr: Multiaddr,
|
||||
/// When the address expires.
|
||||
expires: SystemTime,
|
||||
next_back_off: Duration,
|
||||
/// Don't try to connect to this node until `Instant`.
|
||||
back_off_until: Instant,
|
||||
score: AddrScore,
|
||||
}
|
||||
|
||||
impl Clone for Addr {
|
||||
fn clone(&self) -> Addr {
|
||||
Addr {
|
||||
addr: self.addr.clone(),
|
||||
expires: self.expires,
|
||||
next_back_off: self.next_back_off,
|
||||
back_off_until: self.back_off_until,
|
||||
score: self.score.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct AddrScore {
|
||||
/// If connected, contains the moment when we connected. `None` if we're not connected.
|
||||
connected_since: Option<Instant>,
|
||||
/// Score of this address. Potentially needs to be updated based on `latest_score_update`.
|
||||
score: u32,
|
||||
/// When we last updated the score.
|
||||
latest_score_update: Instant,
|
||||
}
|
||||
|
||||
impl Addr {
|
||||
/// Sets the addr to connected. If the score is lower than the given value, raises it to this
|
||||
/// value.
|
||||
fn connected_now(&mut self, raise_to_min: u32) {
|
||||
let now = Instant::now();
|
||||
Addr::flush(&mut self.score, now);
|
||||
self.score.connected_since = Some(now);
|
||||
if self.score.score < raise_to_min {
|
||||
self.score.score = raise_to_min;
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies a modification to the score.
|
||||
fn adjust_score(&mut self, score_diff: i32) {
|
||||
Addr::flush(&mut self.score, Instant::now());
|
||||
if score_diff >= 0 {
|
||||
self.score.score = cmp::min(MAX_SCORE, self.score.score + score_diff as u32);
|
||||
} else {
|
||||
self.score.score = self.score.score.saturating_sub(-score_diff as u32);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the addr to disconnected and applies a modification to the score.
|
||||
fn disconnected_now(&mut self, score_diff: i32) {
|
||||
Addr::flush(&mut self.score, Instant::now());
|
||||
self.score.connected_since = None;
|
||||
if score_diff >= 0 {
|
||||
self.score.score = cmp::min(MAX_SCORE, self.score.score + score_diff as u32);
|
||||
} else {
|
||||
self.score.score = self.score.score.saturating_sub(-score_diff as u32);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if we are connected to this addr.
|
||||
fn is_connected(&self) -> bool {
|
||||
self.score.connected_since.is_some()
|
||||
}
|
||||
|
||||
/// Returns the score, and true if we are connected to this addr.
|
||||
fn score_and_is_connected(&mut self) -> (u32, bool) {
|
||||
Addr::flush(&mut self.score, Instant::now());
|
||||
let is_connected = self.score.connected_since.is_some();
|
||||
(self.score.score, is_connected)
|
||||
}
|
||||
|
||||
/// Updates `score` and `latest_score_update`, and returns the score.
|
||||
fn score(&mut self) -> u32 {
|
||||
Addr::flush(&mut self.score, Instant::now());
|
||||
self.score.score
|
||||
}
|
||||
|
||||
fn flush(score: &mut AddrScore, now: Instant) {
|
||||
if let Some(connected_since) = score.connected_since {
|
||||
let potential_score: u32 = div_dur_with_dur(now - connected_since, CONNEC_DURATION_PER_SCORE);
|
||||
// We flush when we connect to an address.
|
||||
debug_assert!(score.latest_score_update >= connected_since);
|
||||
let effective_score: u32 =
|
||||
div_dur_with_dur(score.latest_score_update - connected_since, CONNEC_DURATION_PER_SCORE);
|
||||
let to_add = potential_score.saturating_sub(effective_score);
|
||||
score.score = cmp::min(MAX_SCORE, score.score + to_add);
|
||||
}
|
||||
|
||||
score.latest_score_update = now;
|
||||
}
|
||||
}
|
||||
|
||||
/// Divides a `Duration` with a `Duration`. This exists in the stdlib but isn't stable yet.
|
||||
// TODO: remove this function once stable
|
||||
fn div_dur_with_dur(a: Duration, b: Duration) -> u32 {
|
||||
let a_ms = a.as_secs() * 1_000_000 + u64::from(a.subsec_micros());
|
||||
let b_ms = b.as_secs() * 1_000_000 + u64::from(b.subsec_micros());
|
||||
(a_ms / b_ms) as u32
|
||||
}
|
||||
|
||||
/// Serialized version of a `PeerInfo`. Suitable for storage in the cache file.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct SerializedPeerInfo {
|
||||
addrs: Vec<SerializedAddr>,
|
||||
}
|
||||
|
||||
/// Serialized version of an `Addr`. Suitable for storage in the cache file.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct SerializedAddr {
|
||||
addr: String,
|
||||
expires: SystemTime,
|
||||
score: u32,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a mut Addr> for SerializedAddr {
|
||||
fn from(addr: &'a mut Addr) -> SerializedAddr {
|
||||
SerializedAddr {
|
||||
addr: addr.addr.to_string(),
|
||||
expires: addr.expires,
|
||||
score: addr.score(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to load storage from a file.
|
||||
/// Ignores any entry equal to `local_peer_id`.
|
||||
/// Deletes the file and returns an empty map if the file doesn't exist, cannot be opened
|
||||
/// or is corrupted.
|
||||
fn try_load(path: impl AsRef<Path>, local_peer_id: &PeerId) -> FnvHashMap<PeerId, PeerInfo> {
|
||||
let path = path.as_ref();
|
||||
if !path.exists() {
|
||||
debug!(target: "sub-libp2p", "Peer storage file {:?} doesn't exist", path);
|
||||
return Default::default()
|
||||
}
|
||||
|
||||
let mut file = match fs::File::open(path) {
|
||||
// TODO: the capacity of the BufReader is kind of arbitrary ; decide better
|
||||
Ok(f) => BufReader::with_capacity(1024 * 1024, f),
|
||||
Err(err) => {
|
||||
warn!(target: "sub-libp2p", "Failed to open peer storage file: {:?}", err);
|
||||
info!(target: "sub-libp2p", "Deleting peer storage file {:?}", path);
|
||||
let _ = fs::remove_file(path);
|
||||
return Default::default()
|
||||
}
|
||||
};
|
||||
|
||||
// We want to support empty files (and treat them as an empty recordset). Unfortunately
|
||||
// `serde_json` will always produce an error if we do this ("unexpected EOF at line 0
|
||||
// column 0"). Therefore we start by reading one byte from the file in order to check
|
||||
// for EOF.
|
||||
|
||||
let mut first_byte = [0];
|
||||
let num_read = match file.read(&mut first_byte) {
|
||||
Ok(f) => f,
|
||||
Err(err) => {
|
||||
// TODO: DRY
|
||||
warn!(target: "sub-libp2p", "Failed to read peer storage file: {:?}", err);
|
||||
info!(target: "sub-libp2p", "Deleting peer storage file {:?}", path);
|
||||
let _ = fs::remove_file(path);
|
||||
return Default::default()
|
||||
}
|
||||
};
|
||||
|
||||
if num_read == 0 {
|
||||
// File is empty.
|
||||
debug!(target: "sub-libp2p", "Peer storage file {:?} is empty", path);
|
||||
Default::default()
|
||||
|
||||
} else {
|
||||
let data = Cursor::new(first_byte).chain(file);
|
||||
match serde_json::from_reader::<_, serde_json::Value>(data) {
|
||||
Ok(serde_json::Value::Null) => Default::default(),
|
||||
Ok(serde_json::Value::Object(map)) =>
|
||||
deserialize_tolerant(map.into_iter(), local_peer_id),
|
||||
Ok(_) | Err(_) => {
|
||||
// The `Ok(_)` case means that the file doesn't contain a map.
|
||||
let _ = fs::remove_file(path);
|
||||
Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to turn a deserialized version of the storage into the final version.
|
||||
///
|
||||
/// Skips entries that are invalid or equal to `local_peer_id`.
|
||||
fn deserialize_tolerant(
|
||||
iter: impl Iterator<Item = (String, serde_json::Value)>,
|
||||
local_peer_id: &PeerId
|
||||
) -> FnvHashMap<PeerId, PeerInfo> {
|
||||
let now = Instant::now();
|
||||
let now_systime = SystemTime::now();
|
||||
|
||||
let mut out = FnvHashMap::default();
|
||||
for (peer, info) in iter {
|
||||
let peer: PeerId = match peer.parse() {
|
||||
Ok(p) => p,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if &peer == local_peer_id {
|
||||
continue
|
||||
}
|
||||
|
||||
let info: SerializedPeerInfo = match serde_json::from_value(info) {
|
||||
Ok(i) => i,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let mut addrs = Vec::with_capacity(info.addrs.len());
|
||||
for addr in info.addrs {
|
||||
let multiaddr = match addr.addr.parse() {
|
||||
Ok(a) => a,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if addr.expires < now_systime {
|
||||
continue
|
||||
}
|
||||
|
||||
addrs.push(Addr {
|
||||
addr: multiaddr,
|
||||
expires: addr.expires,
|
||||
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
|
||||
back_off_until: now,
|
||||
score: AddrScore {
|
||||
connected_since: None,
|
||||
score: addr.score,
|
||||
latest_score_update: now,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if addrs.is_empty() {
|
||||
continue
|
||||
}
|
||||
|
||||
out.insert(peer, PeerInfo { addrs });
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
/// Attempts to turn a deserialized version of the storage into the final version.
|
||||
///
|
||||
/// Skips entries that are invalid or expired.
|
||||
fn serialize<W: Write>(out: W, map: &mut FnvHashMap<PeerId, PeerInfo>) -> Result<(), IoError> {
|
||||
let now = SystemTime::now();
|
||||
let array: FnvHashMap<_, _> = map.iter_mut().filter_map(|(peer, info)| {
|
||||
if info.addrs.is_empty() {
|
||||
return None
|
||||
}
|
||||
|
||||
let peer = peer.to_base58();
|
||||
let info = SerializedPeerInfo {
|
||||
addrs: info.addrs.iter_mut()
|
||||
.filter_map(|a| if a.expires > now || a.is_connected() {
|
||||
Some(a.into())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Some((peer, info))
|
||||
}).collect();
|
||||
|
||||
serde_json::to_writer_pretty(out, &array)
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||
}
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use crate::ProtocolId;
|
||||
use bytes::Bytes;
|
||||
use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
|
||||
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
|
||||
use libp2p::tokio_codec::Framed;
|
||||
use log::warn;
|
||||
use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter};
|
||||
@@ -92,7 +92,7 @@ pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
/// If true, we should call `poll_complete` on the inner sink.
|
||||
requires_poll_complete: bool,
|
||||
/// The underlying substream.
|
||||
inner: stream::Fuse<Framed<TSubstream, UviBytes<Vec<u8>>>>,
|
||||
inner: stream::Fuse<Framed<Negotiated<TSubstream>, UviBytes<Vec<u8>>>>,
|
||||
/// Id of the protocol.
|
||||
protocol_id: ProtocolId,
|
||||
/// Version of the protocol that was negotiated.
|
||||
@@ -385,7 +385,7 @@ where TSubstream: AsyncRead + AsyncWrite,
|
||||
|
||||
fn upgrade_inbound(
|
||||
self,
|
||||
socket: TSubstream,
|
||||
socket: Negotiated<TSubstream>,
|
||||
info: Self::Info,
|
||||
) -> Self::Future {
|
||||
let framed = {
|
||||
@@ -418,7 +418,7 @@ where TSubstream: AsyncRead + AsyncWrite,
|
||||
|
||||
fn upgrade_outbound(
|
||||
self,
|
||||
socket: TSubstream,
|
||||
socket: Negotiated<TSubstream>,
|
||||
info: Self::Info,
|
||||
) -> Self::Future {
|
||||
let framed = Framed::new(socket, UviBytes::default());
|
||||
|
||||
@@ -106,12 +106,6 @@ pub struct NetworkState {
|
||||
/// List of addresses the node is currently listening on.
|
||||
pub listened_addresses: HashSet<Multiaddr>,
|
||||
// TODO (https://github.com/libp2p/rust-libp2p/issues/978): external_addresses: Vec<Multiaddr>,
|
||||
/// If true, we only accept reserved peers.
|
||||
pub is_reserved_only: bool,
|
||||
/// PeerIds of the nodes that are marked as reserved.
|
||||
pub reserved_peers: HashSet<String>,
|
||||
/// PeerIds of the nodes that are banned, and how long in the seconds the ban remains.
|
||||
pub banned_peers: HashMap<String, u64>,
|
||||
/// List of node we're connected to.
|
||||
pub connected_peers: HashMap<String, NetworkStatePeer>,
|
||||
/// List of node that we know of but that we're not connected to.
|
||||
@@ -137,15 +131,15 @@ pub struct NetworkStatePeer {
|
||||
/// If true, the peer is "open", which means that we have a Substrate-related protocol
|
||||
/// with this peer.
|
||||
pub open: bool,
|
||||
/// List of addresses known for this node, with its reputation score.
|
||||
pub known_addresses: HashMap<Multiaddr, u32>,
|
||||
/// List of addresses known for this node.
|
||||
pub known_addresses: HashSet<Multiaddr>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NetworkStateNotConnectedPeer {
|
||||
/// List of addresses known for this node, with its reputation score.
|
||||
pub known_addresses: HashMap<Multiaddr, u32>,
|
||||
/// List of addresses known for this node.
|
||||
pub known_addresses: HashSet<Multiaddr>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize)]
|
||||
|
||||
@@ -19,20 +19,19 @@ use crate::{
|
||||
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
|
||||
};
|
||||
use crate::custom_proto::{CustomMessage, RegisteredProtocol};
|
||||
use crate::{NetworkConfiguration, NodeIndex, parse_str_addr};
|
||||
use crate::{NetworkConfiguration, NonReservedPeerMode, NodeIndex, parse_str_addr};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{prelude::*, Stream};
|
||||
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
|
||||
use libp2p::{multiaddr::Protocol, Multiaddr, core::swarm::NetworkBehaviour, PeerId};
|
||||
use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||
use libp2p::core::nodes::ConnectedPoint;
|
||||
use log::{debug, error, info, warn};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::fs;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_timer::Interval;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Starts the substrate libp2p service.
|
||||
///
|
||||
@@ -40,13 +39,48 @@ use tokio_timer::Interval;
|
||||
pub fn start_service<TMessage>(
|
||||
config: NetworkConfiguration,
|
||||
registered_custom: RegisteredProtocol<TMessage>,
|
||||
) -> Result<Service<TMessage>, IoError>
|
||||
) -> Result<(Service<TMessage>, Arc<substrate_peerset::Peerset>), IoError>
|
||||
where TMessage: CustomMessage + Send + 'static {
|
||||
|
||||
if let Some(ref path) = config.net_config_path {
|
||||
fs::create_dir_all(Path::new(path))?;
|
||||
}
|
||||
|
||||
// List of multiaddresses that we know in the network.
|
||||
let mut known_addresses = Vec::new();
|
||||
let mut bootnodes = Vec::new();
|
||||
let mut reserved_nodes = Vec::new();
|
||||
|
||||
// Process the bootnodes.
|
||||
for bootnode in config.boot_nodes.iter() {
|
||||
match parse_str_addr(bootnode) {
|
||||
Ok((peer_id, addr)) => {
|
||||
bootnodes.push(peer_id.clone());
|
||||
known_addresses.push((peer_id, addr));
|
||||
},
|
||||
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the reserved peers.
|
||||
for reserved in config.reserved_nodes.iter() {
|
||||
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
|
||||
reserved_nodes.push(peer_id.clone());
|
||||
known_addresses.push((peer_id, addr));
|
||||
} else {
|
||||
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
|
||||
}
|
||||
}
|
||||
|
||||
// Build the peerset.
|
||||
let (peerset, peerset_receiver) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig {
|
||||
in_peers: 25,
|
||||
out_peers: 25,
|
||||
bootnodes,
|
||||
reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny,
|
||||
reserved_nodes,
|
||||
});
|
||||
|
||||
// Private and public keys configuration.
|
||||
let local_identity = config.node_key.clone().into_keypair()?;
|
||||
let local_public = local_identity.public();
|
||||
@@ -54,7 +88,8 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
|
||||
// Build the swarm.
|
||||
let (mut swarm, bandwidth) = {
|
||||
let behaviour = Behaviour::new(&config, local_public, registered_custom);
|
||||
let user_agent = format!("{} ({})", config.client_version, config.node_name);
|
||||
let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset_receiver);
|
||||
let (transport, bandwidth) = transport::build_transport(local_identity);
|
||||
(Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth)
|
||||
};
|
||||
@@ -75,36 +110,16 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
Swarm::add_external_address(&mut swarm, addr.clone());
|
||||
}
|
||||
|
||||
// Connect to the bootnodes.
|
||||
for bootnode in config.boot_nodes.iter() {
|
||||
match parse_str_addr(bootnode) {
|
||||
Ok((peer_id, _)) => Swarm::dial(&mut swarm, peer_id),
|
||||
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the reserved peers.
|
||||
for reserved in config.reserved_nodes.iter() {
|
||||
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
|
||||
swarm.add_reserved_peer(peer_id.clone(), addr);
|
||||
Swarm::dial(&mut swarm, peer_id);
|
||||
} else {
|
||||
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "sub-libp2p", "Topology started with {} entries",
|
||||
swarm.num_topology_peers());
|
||||
|
||||
Ok(Service {
|
||||
let service = Service {
|
||||
swarm,
|
||||
bandwidth,
|
||||
nodes_info: Default::default(),
|
||||
index_by_id: Default::default(),
|
||||
next_node_id: 1,
|
||||
cleanup: Interval::new_interval(Duration::from_secs(60)),
|
||||
injected_events: Vec::new(),
|
||||
})
|
||||
};
|
||||
|
||||
Ok((service, peerset))
|
||||
}
|
||||
|
||||
/// Event produced by the service.
|
||||
@@ -164,10 +179,6 @@ pub struct Service<TMessage> where TMessage: CustomMessage {
|
||||
/// Next index to assign to a node.
|
||||
next_node_id: NodeIndex,
|
||||
|
||||
/// Stream that fires when we need to cleanup and flush the topology, and cleanup the disabled
|
||||
/// peers.
|
||||
cleanup: Interval,
|
||||
|
||||
/// Events to produce on the Stream.
|
||||
injected_events: Vec<ServiceEvent<TMessage>>,
|
||||
}
|
||||
@@ -189,13 +200,11 @@ impl<TMessage> Service<TMessage>
|
||||
where TMessage: CustomMessage + Send + 'static {
|
||||
/// Returns a struct containing tons of useful information about the network.
|
||||
pub fn state(&mut self) -> NetworkState {
|
||||
let now = Instant::now();
|
||||
|
||||
let connected_peers = {
|
||||
let swarm = &mut self.swarm;
|
||||
self.nodes_info.values().map(move |info| {
|
||||
let known_addresses = swarm.known_addresses(&info.peer_id)
|
||||
.map(|(a, s)| (a.clone(), s)).collect();
|
||||
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, &info.peer_id)
|
||||
.into_iter().collect();
|
||||
|
||||
(info.peer_id.to_base58(), NetworkStatePeer {
|
||||
endpoint: info.endpoint.clone().into(),
|
||||
@@ -214,10 +223,9 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
let list = swarm.known_peers().filter(|p| !index_by_id.contains_key(p))
|
||||
.cloned().collect::<Vec<_>>();
|
||||
list.into_iter().map(move |peer_id| {
|
||||
let known_addresses = swarm.known_addresses(&peer_id)
|
||||
.map(|(a, s)| (a.clone(), s)).collect();
|
||||
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
|
||||
known_addresses,
|
||||
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
|
||||
.into_iter().collect(),
|
||||
})
|
||||
}).collect()
|
||||
};
|
||||
@@ -225,12 +233,6 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
NetworkState {
|
||||
peer_id: Swarm::local_peer_id(&self.swarm).to_base58(),
|
||||
listened_addresses: Swarm::listeners(&self.swarm).cloned().collect(),
|
||||
reserved_peers: self.swarm.reserved_peers().map(|p| p.to_base58()).collect(),
|
||||
banned_peers: self.swarm.banned_nodes().map(|(p, until)| {
|
||||
let dur = if until > now { until - now } else { Duration::new(0, 0) };
|
||||
(p.to_base58(), dur.as_secs())
|
||||
}).collect(),
|
||||
is_reserved_only: self.swarm.is_reserved_only(),
|
||||
average_download_per_sec: self.bandwidth.average_download_per_sec(),
|
||||
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
|
||||
connected_peers,
|
||||
@@ -268,31 +270,6 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
self.nodes_info.keys().cloned()
|
||||
}
|
||||
|
||||
/// Try to add a reserved peer.
|
||||
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
self.swarm.add_reserved_peer(peer_id, addr);
|
||||
}
|
||||
|
||||
/// Try to remove a reserved peer.
|
||||
///
|
||||
/// If we are in reserved mode and we were connected to a node with this peer ID, then this
|
||||
/// method will disconnect it.
|
||||
pub fn remove_reserved_peer(&mut self, peer_id: PeerId) {
|
||||
self.swarm.remove_reserved_peer(peer_id);
|
||||
}
|
||||
|
||||
/// Start accepting all peers again if we weren't.
|
||||
#[inline]
|
||||
pub fn accept_unreserved_peers(&mut self) {
|
||||
self.swarm.accept_unreserved_peers();
|
||||
}
|
||||
|
||||
/// Start refusing non-reserved nodes. Disconnects the nodes that we are connected to that
|
||||
/// aren't reserved.
|
||||
pub fn deny_unreserved_peers(&mut self) {
|
||||
self.swarm.deny_unreserved_peers();
|
||||
}
|
||||
|
||||
/// Returns the `PeerId` of a node.
|
||||
#[inline]
|
||||
pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> {
|
||||
@@ -327,23 +304,10 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
/// Disconnects a peer and bans it for a little while.
|
||||
///
|
||||
/// Same as `drop_node`, except that the same peer will not be able to reconnect later.
|
||||
#[inline]
|
||||
pub fn ban_node(&mut self, node_index: NodeIndex) {
|
||||
if let Some(info) = self.nodes_info.get(&node_index) {
|
||||
info!(target: "sub-libp2p", "Banned {:?} (#{:?}, {:?}, {:?})", info.peer_id,
|
||||
node_index, info.endpoint, info.client_version);
|
||||
self.swarm.ban_node(info.peer_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Disconnects a peer.
|
||||
///
|
||||
/// This is asynchronous and will not immediately close the peer.
|
||||
/// Corresponding closing events will be generated once the closing actually happens.
|
||||
#[inline]
|
||||
pub fn drop_node(&mut self, node_index: NodeIndex) {
|
||||
if let Some(info) = self.nodes_info.get(&node_index) {
|
||||
debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?}, {:?}, {:?})",
|
||||
@@ -352,6 +316,11 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a hard-coded address for the given peer, that never expires.
|
||||
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
self.swarm.add_known_address(peer_id, addr)
|
||||
}
|
||||
|
||||
/// Get debug info for a given peer.
|
||||
pub fn peer_debug_info(&self, who: NodeIndex) -> String {
|
||||
if let Some(info) = self.nodes_info.get(&who) {
|
||||
@@ -394,7 +363,6 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
loop {
|
||||
match self.swarm.poll() {
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => {
|
||||
debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id);
|
||||
let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint);
|
||||
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
|
||||
peer_id,
|
||||
@@ -403,8 +371,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
debug_info: self.peer_debug_info(node_index),
|
||||
})))
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, result }))) => {
|
||||
debug!(target: "sub-libp2p", "Custom protocol with {:?} closed: {:?}", peer_id, result);
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, .. }))) => {
|
||||
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
|
||||
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
|
||||
node_index,
|
||||
@@ -457,40 +424,6 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls the stream that fires when we need to cleanup and flush the topology.
|
||||
fn poll_cleanup(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
|
||||
loop {
|
||||
match self.cleanup.poll() {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(Some(_))) => {
|
||||
debug!(target: "sub-libp2p", "Cleaning and flushing topology");
|
||||
self.swarm.cleanup();
|
||||
if let Err(err) = self.swarm.flush_topology() {
|
||||
warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
|
||||
}
|
||||
debug!(target: "sub-libp2p", "Topology now contains {} nodes",
|
||||
self.swarm.num_topology_peers());
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
warn!(target: "sub-libp2p", "Topology flush stream ended unexpectedly");
|
||||
return Ok(Async::Ready(None))
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(target: "sub-libp2p", "Topology flush stream errored: {:?}", err);
|
||||
return Err(IoError::new(IoErrorKind::Other, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage> Drop for Service<TMessage> where TMessage: CustomMessage {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = self.swarm.flush_topology() {
|
||||
warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage> Stream for Service<TMessage> where TMessage: CustomMessage + Send + 'static {
|
||||
@@ -507,11 +440,6 @@ impl<TMessage> Stream for Service<TMessage> where TMessage: CustomMessage + Send
|
||||
Async::NotReady => (),
|
||||
}
|
||||
|
||||
match self.poll_cleanup()? {
|
||||
Async::Ready(value) => return Ok(Async::Ready(value)),
|
||||
Async::NotReady => (),
|
||||
}
|
||||
|
||||
// The only way we reach this is if we went through all the `NotReady` paths above,
|
||||
// ensuring the current task is registered everywhere.
|
||||
Ok(Async::NotReady)
|
||||
|
||||
Reference in New Issue
Block a user