Update libp2p to v0.3 (#1634)

* Update libp2p

* Some more diagnostics

* 30 seconds back to 5 seconds

* Bump libp2p-core and improve test

* Fix runtime Cargo.lock

* More work

* Finish upgrade to libp2p 0.3

* Add a maximum of 60 seconds for the rounds

* Remove env_logger

* Update Cargo.lock

* Update Cargo.lock in test-runtime

* Fix test compilation

* Make the test pass

* Add identify addresses to Kademlia

* Don't connect to nodes we're already connected to

* Add warning for non-Substrate nodes

* Fix external address not added

* Start in Enabled mode
This commit is contained in:
Pierre Krieger
2019-02-06 16:39:22 +01:00
committed by Arkadiy Paronyan
parent 7d8ae2df5c
commit b6fd967dfb
18 changed files with 599 additions and 411 deletions
+64 -20
View File
@@ -19,11 +19,11 @@ use crate::{NetworkConfiguration, ProtocolId};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{PeerId, ProtocolsHandler};
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
use libp2p::kad::{Kademlia, KademliaOut, KademliaTopology};
use libp2p::kad::{Kademlia, KademliaOut, KadConnectionType};
use libp2p::ping::{Ping, PingEvent};
use log::{debug, trace, warn};
use std::{cmp, io, time::Duration, time::Instant};
@@ -51,17 +51,20 @@ pub struct Behaviour<TSubstream> {
impl<TSubstream> Behaviour<TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_peer_id (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_peer_id: PeerId, protocols: RegisteredProtocols) -> Self {
// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
Identify::new(proto_version, user_agent)
Identify::new(proto_version, user_agent, local_public_key.clone())
};
let local_peer_id = local_public_key.into_peer_id();
let custom_protocols = CustomProtos::new(config, &local_peer_id, protocols);
Behaviour {
ping: Ping::new(),
custom_protocols: CustomProtos::new(config, protocols),
custom_protocols,
discovery: DiscoveryBehaviour::new(local_peer_id),
identify,
events: Vec::new(),
@@ -79,9 +82,26 @@ impl<TSubstream> Behaviour<TSubstream> {
self.custom_protocols.send_packet(target, protocol_id, data)
}
/// Returns the number of peers in the topology.
pub fn num_topology_peers(&self) -> usize {
self.custom_protocols.num_topology_peers()
}
/// Flushes the topology to the disk.
pub fn flush_topology(&mut self) -> Result<(), io::Error> {
self.custom_protocols.flush_topology()
}
/// Perform a cleanup pass, removing all obsolete addresses and peers.
///
/// This should be done from time to time.
pub fn cleanup(&mut self) {
self.custom_protocols.cleanup();
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId) {
self.custom_protocols.add_reserved_peer(peer_id)
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.custom_protocols.add_reserved_peer(peer_id, addr)
}
/// Try to remove a reserved peer.
@@ -218,6 +238,20 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
// TODO: ideally we would delay the first identification to when we open the custom
// protocol, so that we only report id info to the service about the nodes we
// care about (https://github.com/libp2p/rust-libp2p/issues/876)
if !info.protocol_version.contains("substrate") {
warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info);
}
if info.listen_addrs.is_empty() {
warn!(target: "sub-libp2p", "Received identify response with empty list of \
addresses");
}
for addr in &info.listen_addrs {
self.discovery.kademlia.add_address(&peer_id, addr.clone());
}
self.custom_protocols.add_discovered_addrs(
&peer_id,
info.listen_addrs.iter().map(|addr| (addr.clone(), true))
);
self.events.push(BehaviourOut::Identified { peer_id, info });
}
IdentifyEvent::Error { .. } => {}
@@ -227,10 +261,13 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, out: KademliaOut) {
// We only ever use Kademlia for discovering nodes, and nodes discovered by Kademlia are
// automatically added to the topology. Therefore we don't need to perform any further
// action.
match out {
KademliaOut::Discovered { peer_id, addresses, ty } => {
self.custom_protocols.add_discovered_addrs(
&peer_id,
addresses.into_iter().map(|addr| (addr, ty == KadConnectionType::Connected))
);
}
KademliaOut::FindNodeResult { key, closer_peers } => {
trace!(target: "sub-libp2p", "Kademlia query for {:?} yielded {:?} results",
key, closer_peers.len());
@@ -284,24 +321,31 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for DiscoveryBehaviour<TSubstream>
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: KademliaTopology,
{
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::ProtocolsHandler;
type OutEvent = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::OutEvent;
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Kademlia<TSubstream> as NetworkBehaviour>::OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::<TTopology>::new_handler(&mut self.kademlia)
NetworkBehaviour::new_handler(&mut self.kademlia)
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.kademlia.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_connected(&mut self.kademlia, peer_id, endpoint)
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) {
NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened)
}
fn inject_node_event(
@@ -309,12 +353,12 @@ where
peer_id: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
NetworkBehaviour::<TTopology>::inject_node_event(&mut self.kademlia, peer_id, event)
NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event)
}
fn poll(
&mut self,
params: &mut PollParameters<TTopology>,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@@ -15,19 +15,23 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn};
use crate::custom_proto::topology::NetTopology;
use crate::custom_proto::upgrade::RegisteredProtocols;
use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId, topology::NetTopology};
use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId};
use crate::parse_str_addr;
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{protocols_handler::ProtocolsHandler, PeerId};
use libp2p::core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use log::{debug, trace, warn};
use smallvec::SmallVec;
use std::{io, marker::PhantomData, time::Duration, time::Instant};
use std::{cmp, error, io, marker::PhantomData, path::Path, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
// File where the network topology is stored.
const NODES_FILE: &str = "nodes.json";
// Duration during which a peer is disabled.
const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
@@ -36,6 +40,9 @@ pub struct CustomProtos<TSubstream> {
/// List of protocols to open with peers. Never modified.
registered_protocols: RegisteredProtocols,
/// Topology of the network.
topology: NetTopology,
/// List of custom protocols that we have open with remotes.
open_protocols: Vec<(PeerId, ProtocolId)>,
@@ -56,6 +63,9 @@ pub struct CustomProtos<TSubstream> {
/// If true, only reserved peers can connect.
reserved_only: bool,
/// List of the IDs of the peers we are connected to.
connected_peers: FnvHashSet<PeerId>,
/// List of the IDs of the reserved peers. We always try to maintain a connection these peers.
reserved_peers: FnvHashSet<PeerId>,
@@ -122,7 +132,24 @@ pub enum CustomProtosOut {
impl<TSubstream> CustomProtos<TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(config: &NetworkConfiguration, registered_protocols: RegisteredProtocols) -> Self {
pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols) -> Self {
// Initialize the topology of the network.
let mut topology = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
NetTopology::from_file(local_peer_id.clone(), path)
} else {
debug!(target: "sub-libp2p", "No peers file configured ; peers won't be saved");
NetTopology::memory(local_peer_id.clone())
};
// Add the bootstrap nodes to the topology.
for bootnode in config.boot_nodes.iter() {
if let Ok((peer_id, addr)) = parse_str_addr(bootnode) {
topology.add_bootstrap_addr(&peer_id, addr.clone());
}
}
let max_incoming_connections = config.in_peers as usize;
let max_outgoing_connections = config.out_peers as usize;
@@ -136,9 +163,11 @@ impl<TSubstream> CustomProtos<TSubstream> {
CustomProtos {
registered_protocols,
topology,
max_incoming_connections,
max_outgoing_connections,
reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny,
connected_peers: Default::default(),
reserved_peers: Default::default(),
banned_peers: Vec::new(),
open_protocols: Vec::with_capacity(open_protos_cap),
@@ -150,7 +179,8 @@ impl<TSubstream> CustomProtos<TSubstream> {
}
/// Adds a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId) {
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.topology.add_bootstrap_addr(&peer_id, addr);
self.reserved_peers.insert(peer_id);
// Trigger a `connect_to_nodes` round.
@@ -240,11 +270,40 @@ impl<TSubstream> CustomProtos<TSubstream> {
});
}
/// Indicates to the topology that we have discovered new addresses for a given node.
pub fn add_discovered_addrs<I>(
&mut self,
peer_id: &PeerId,
addrs: I,
) where I: Iterator<Item = (Multiaddr, bool)> {
if self.topology.add_discovered_addrs(peer_id, addrs) {
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
}
}
/// Returns the number of peers in the topology.
pub fn num_topology_peers(&self) -> usize {
self.topology.num_peers()
}
/// Flushes the topology to the disk.
pub fn flush_topology(&mut self) -> Result<(), io::Error> {
self.topology.flush_to_disk()
}
/// Perform a cleanup pass, removing all obsolete addresses and peers.
///
/// This should be done from time to time.
pub fn cleanup(&mut self) {
self.topology.cleanup();
}
/// Updates the attempted connections to nodes.
///
/// Also updates `next_connect_to_nodes` with the earliest known moment when we need to
/// update connections again.
fn connect_to_nodes(&mut self, params: &mut PollParameters<NetTopology>) {
fn connect_to_nodes(&mut self, params: &mut PollParameters) {
// Make sure we are connected or connecting to all the reserved nodes.
for reserved in self.reserved_peers.iter() {
// TODO: don't generate an event if we're already in a pending connection (https://github.com/libp2p/rust-libp2p/issues/697)
@@ -272,7 +331,7 @@ impl<TSubstream> CustomProtos<TSubstream> {
num_to_open);
let local_peer_id = params.local_peer_id().clone();
let (to_try, will_change) = params.topology().addrs_to_attempt();
let (to_try, will_change) = self.topology.addrs_to_attempt();
for (peer_id, _) in to_try {
if num_to_open == 0 {
break
@@ -282,6 +341,10 @@ impl<TSubstream> CustomProtos<TSubstream> {
continue
}
if self.connected_peers.contains(&peer_id) {
continue
}
if let Some((_, ban_end)) = self.banned_peers.iter().find(|(p, _)| p == peer_id) {
if *ban_end > Instant::now() {
continue
@@ -293,11 +356,11 @@ impl<TSubstream> CustomProtos<TSubstream> {
}
// Next round is when we expect the topology will change.
self.next_connect_to_nodes.reset(will_change);
self.next_connect_to_nodes.reset(cmp::min(will_change, Instant::now() + Duration::from_secs(60)));
}
}
impl<TSubstream> NetworkBehaviour<NetTopology> for CustomProtos<TSubstream>
impl<TSubstream> NetworkBehaviour for CustomProtos<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
@@ -308,13 +371,23 @@ where
CustomProtosHandler::new(self.registered_protocols.clone())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.topology.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
// When a peer connects, its handler is initially in the disabled state. We make sure that
// the peer is allowed, and if so we put it in the enabled state.
self.connected_peers.insert(peer_id.clone());
let is_reserved = self.reserved_peers.contains(&peer_id);
if self.reserved_only && !is_reserved {
debug!(target: "sub-libp2p", "Ignoring {:?} because we're in reserved mode", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
});
return
}
@@ -323,6 +396,10 @@ where
if let Some((_, expire)) = self.banned_peers.iter().find(|(p, _)| p == &peer_id) {
if *expire >= Instant::now() {
debug!(target: "sub-libp2p", "Ignoring banned peer {:?}", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
});
return
}
}
@@ -338,6 +415,10 @@ where
debug_assert!(num_outgoing <= self.max_outgoing_connections);
if num_outgoing == self.max_outgoing_connections {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
});
return
}
}
@@ -351,6 +432,10 @@ where
if num_ingoing == self.max_incoming_connections {
debug!(target: "sub-libp2p", "Ignoring incoming connection from {:?} because \
we're full", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
});
return
}
}
@@ -374,10 +459,16 @@ where
});
}
self.topology.set_connected(&peer_id, &endpoint);
self.enabled_peers.insert(peer_id, endpoint);
}
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
let was_connected = self.connected_peers.remove(&peer_id);
debug_assert!(was_connected);
self.topology.set_disconnected(peer_id, &endpoint);
while let Some(pos) = self.open_protocols.iter().position(|(p, _)| p == peer_id) {
let (_, protocol_id) = self.open_protocols.remove(pos);
@@ -388,12 +479,24 @@ where
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
self.enabled_peers.remove(peer_id);
}
fn inject_dial_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn error::Error) {
if let Some(peer_id) = peer_id.as_ref() {
debug!(target: "sub-libp2p", "Failed to reach peer {:?} through {} => {:?}", peer_id, addr, error);
if self.connected_peers.contains(peer_id) {
self.topology.set_unreachable(addr);
}
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
}
self.enabled_peers.remove(peer_id);
}
fn inject_node_event(
@@ -461,7 +564,7 @@ where
fn poll(
&mut self,
params: &mut PollParameters<NetTopology>,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@@ -19,13 +19,14 @@ use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, Regi
use bytes::Bytes;
use futures::prelude::*;
use libp2p::core::{
Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::KeepAlive,
protocols_handler::ProtocolsHandlerUpgrErr,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use log::{trace, warn};
use smallvec::SmallVec;
use std::{fmt, io};
use std::{fmt, io, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use void::Void;
@@ -42,6 +43,9 @@ pub struct CustomProtosHandler<TSubstream> {
/// See the documentation of `State`.
state: State,
/// Value to be returned by `connection_keep_alive()`.
keep_alive: KeepAlive,
/// The active substreams. There should always ever be only one substream per protocol.
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
@@ -130,7 +134,9 @@ where
pub fn new(protocols: RegisteredProtocols) -> Self {
CustomProtosHandler {
protocols,
state: State::Disabled,
// We keep the connection alive for at least 5 seconds, waiting for what happens.
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(5)),
state: State::Normal,
substreams: SmallVec::new(),
events_queue: SmallVec::new(),
}
@@ -140,9 +146,10 @@ where
fn inject_fully_negotiated(
&mut self,
proto: RegisteredProtocolSubstream<TSubstream>,
_: Endpoint,
) {
match self.state {
// TODO: we should shut down refused substreams gracefully; this should be fixed
// at the same time as https://github.com/paritytech/substrate/issues/1517
State::Disabled | State::ShuttingDown => return,
State::Normal => ()
}
@@ -183,7 +190,7 @@ where
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
self.inject_fully_negotiated(proto, Endpoint::Listener);
self.inject_fully_negotiated(proto);
}
#[inline]
@@ -192,7 +199,7 @@ where
proto: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
self.inject_fully_negotiated(proto, Endpoint::Dialer);
self.inject_fully_negotiated(proto);
}
fn inject_event(&mut self, message: CustomProtosHandlerIn) {
@@ -203,6 +210,7 @@ where
State::Disabled | State::ShuttingDown => (),
}
self.keep_alive = KeepAlive::Now;
for substream in self.substreams.iter_mut() {
substream.shutdown();
}
@@ -213,6 +221,8 @@ where
State::Normal | State::ShuttingDown => (),
}
self.keep_alive = KeepAlive::Forever;
// Try open one substream for each registered protocol.
if let CustomProtosHandlerIn::EnableActive = message {
for protocol in self.protocols.0.iter() {
@@ -253,14 +263,16 @@ where
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<io::Error>) {
warn!(target: "sub-libp2p", "Error while opening custom protocol: {:?}", err);
}
#[inline]
fn connection_keep_alive(&self) -> bool {
// Right now if the remote doesn't support one of the custom protocols, we shut down the
// entire connection. This is a hack-ish solution to the problem where we connect to nodes
// that support libp2p but not the testnet that we want.
self.substreams.len() == self.protocols.len()
self.shutdown();
}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn shutdown(&mut self) {
@@ -19,4 +19,5 @@ pub use self::upgrade::{RegisteredProtocol, RegisteredProtocols};
mod behaviour;
mod handler;
mod topology;
mod upgrade;
@@ -15,12 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use fnv::FnvHashMap;
use libp2p::{Multiaddr, PeerId, identify::IdentifyTopology, multihash::Multihash};
use libp2p::core::{PublicKey, swarm::ConnectedPoint, topology::DisconnectReason, topology::Topology};
use libp2p::kad::{KBucketsPeerId, KadConnectionType, KademliaTopology};
use libp2p::{core::swarm::ConnectedPoint, Multiaddr, PeerId};
use log::{debug, info, trace, warn};
use serde_derive::{Serialize, Deserialize};
use std::{cmp, fs, iter, vec};
use std::{cmp, fs};
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write, BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};
@@ -58,8 +56,6 @@ const FAIL_BACKOFF_MULTIPLIER: u32 = 2;
/// We need a maximum value for the backoff, overwise we risk an overflow.
const MAX_BACKOFF: Duration = Duration::from_secs(30 * 60);
// TODO: should be merged with the Kademlia k-buckets
/// Stores information about the topology of the network.
#[derive(Debug)]
pub struct NetTopology {
@@ -67,12 +63,8 @@ pub struct NetTopology {
store: FnvHashMap<PeerId, PeerInfo>,
/// Optional path to the file that caches the serialized version of `store`.
cache_path: Option<PathBuf>,
/// Public key of the local node.
local_public_key: PublicKey,
/// PeerId of the local node. Derived from `local_public_key`.
/// PeerId of the local node.
local_peer_id: PeerId,
/// Known addresses for the local node to report to the network.
external_addresses: Vec<Multiaddr>,
}
impl NetTopology {
@@ -80,14 +72,11 @@ impl NetTopology {
///
/// `flush_to_disk()` will be a no-op.
#[inline]
pub fn memory(local_public_key: PublicKey) -> NetTopology {
let local_peer_id = local_public_key.clone().into_peer_id();
pub fn memory(local_peer_id: PeerId) -> NetTopology {
NetTopology {
store: Default::default(),
cache_path: None,
local_peer_id,
local_public_key,
external_addresses: Vec::new(),
}
}
@@ -97,17 +86,14 @@ impl NetTopology {
/// or contains garbage data, the execution still continues.
///
/// Calling `flush_to_disk()` in the future writes to the given path.
pub fn from_file<P: AsRef<Path>>(local_public_key: PublicKey, path: P) -> NetTopology {
pub fn from_file<P: AsRef<Path>>(local_peer_id: PeerId, path: P) -> NetTopology {
let path = path.as_ref();
let local_peer_id = local_public_key.clone().into_peer_id();
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
let store = try_load(path, &local_peer_id);
NetTopology {
store,
cache_path: Some(path.to_owned()),
local_peer_id,
local_public_key,
external_addresses: Vec::new(),
}
}
@@ -146,12 +132,6 @@ impl NetTopology {
});
}
/// Add the external addresses that are known for the local node.
pub fn add_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr> {
self.external_addresses.extend(addrs);
}
/// Returns a list of all the known addresses of peers, ordered by the
/// order in which we should attempt to connect to them.
///
@@ -200,6 +180,10 @@ impl NetTopology {
///
/// We assume that the address is valid, so its score starts very high.
pub fn add_bootstrap_addr(&mut self, peer: &PeerId, addr: Multiaddr) {
if *peer == self.local_peer_id {
return
}
let now_systime = SystemTime::now();
let now = Instant::now();
@@ -235,16 +219,22 @@ impl NetTopology {
}
}
/// Inner implementaiton of the `add_*_discovered_addrs` methods.
/// Indicates the topology that we have discovered new addresses for a given node.
///
/// Returns `true` if the topology has changed in some way. Returns `false` if calling this
/// method was a no-op.
fn add_discovered_addrs<I>(
pub fn add_discovered_addrs<I>(
&mut self,
peer_id: &PeerId,
addrs: I,
) -> bool
where I: Iterator<Item = (Multiaddr, bool)> {
if *peer_id == self.local_peer_id {
return false
}
let mut addrs: Vec<_> = addrs.collect();
let now_systime = SystemTime::now();
let now = Instant::now();
@@ -252,14 +242,14 @@ impl NetTopology {
let new_addrs = peer.addrs
.drain(..)
.filter_map(|a| {
.filter(|a| {
if a.expires < now_systime && !a.is_connected() {
return None
return false
}
if let Some(pos) = addrs.iter().position(|&(ref addr, _)| addr == &a.addr) {
while let Some(pos) = addrs.iter().position(|&(ref addr, _)| addr == &a.addr) {
addrs.remove(pos);
}
Some(a)
true
})
.collect();
peer.addrs = new_addrs;
@@ -267,6 +257,7 @@ impl NetTopology {
let mut anything_changed = false;
if !addrs.is_empty() {
anything_changed = true;
trace!(
target: "sub-libp2p",
"Peer store: adding addresses {:?} for {:?}",
@@ -292,7 +283,11 @@ impl NetTopology {
}
}
anything_changed = true;
// `addrs` can contain duplicates, therefore we would insert the same address twice.
if peer.addrs.iter().any(|a| a.addr == addr) {
continue;
}
peer.addrs.push(Addr {
addr,
expires: now_systime + KADEMLIA_DISCOVERY_EXPIRATION,
@@ -308,58 +303,10 @@ impl NetTopology {
anything_changed
}
}
impl KademliaTopology for NetTopology {
type ClosestPeersIter = vec::IntoIter<PeerId>;
type GetProvidersIter = iter::Empty<PeerId>;
fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, ty: KadConnectionType) {
self.add_discovered_addrs(&peer, iter::once((addr, ty == KadConnectionType::Connected)));
}
fn closest_peers(&mut self, target: &Multihash, _max: usize) -> Self::ClosestPeersIter {
// TODO: very inefficient
let mut peers = self.store.keys().cloned().collect::<Vec<_>>();
peers.push(self.local_peer_id.clone());
peers.sort_by(|a, b| {
b.as_ref().distance_with(target).cmp(&a.as_ref().distance_with(target))
});
peers.into_iter()
}
fn add_provider(&mut self, _: Multihash, _: PeerId) {
// We don't implement ADD_PROVIDER/GET_PROVIDERS
}
fn get_providers(&mut self, _: &Multihash) -> Self::GetProvidersIter {
// We don't implement ADD_PROVIDER/GET_PROVIDERS
iter::empty()
}
}
impl IdentifyTopology for NetTopology {
/// Returns the addresses stored for a specific peer.
#[inline]
fn add_identify_discovered_addrs<TIter>(&mut self, peer: &PeerId, addrs: TIter)
where
TIter: Iterator<Item = Multiaddr>
{
// These are addresses that peers indicate for themselves.
// The typical use case is:
// - A peer connects to one of our listening points.
// - We send an identify request to it, and it answers with a list of addresses.
// - If later it disconnects, we can try to dial it back through one of these addresses.
self.add_discovered_addrs(peer, addrs.map(move |a| (a, true)));
}
}
impl Topology for NetTopology {
#[inline]
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
if peer == &self.local_peer_id {
return self.external_addresses.clone()
}
pub fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let peer = if let Some(peer) = self.store.get_mut(peer) {
peer
} else {
@@ -382,20 +329,12 @@ impl Topology for NetTopology {
list.into_iter().map(|(_, addr)| addr.clone()).collect::<Vec<_>>()
}
fn add_local_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr> {
self.add_external_addrs(addrs)
}
/// Marks the given peer as connected through the given endpoint.
pub fn set_connected(&mut self, peer: &PeerId, endpoint: &ConnectedPoint) {
if *peer == self.local_peer_id {
return
}
fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
fn local_public_key(&self) -> &PublicKey {
&self.local_public_key
}
fn set_connected(&mut self, peer: &PeerId, endpoint: &ConnectedPoint) {
let addr = match endpoint {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => return
@@ -438,17 +377,16 @@ impl Topology for NetTopology {
}
}
fn set_disconnected(&mut self, _: &PeerId, endpoint: &ConnectedPoint, reason: DisconnectReason) {
/// Marks the given peer as disconnected. The endpoint is the one we were connected to.
pub fn set_disconnected(&mut self, _: &PeerId, endpoint: &ConnectedPoint) {
let addr = match endpoint {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => return
};
let score_diff = match reason {
DisconnectReason::Replaced => -3,
DisconnectReason::Graceful => -1,
DisconnectReason::Error => -5,
};
// Note that we used to have different score values here in the past, but there really
// isn't much point in doing so in practice.
let score_diff = -3;
for info in self.store.values_mut() {
for a in info.addrs.iter_mut() {
@@ -466,13 +404,15 @@ impl Topology for NetTopology {
}
}
fn set_unreachable(&mut self, addr: &Multiaddr) {
/// Indicates to the topology that we failed to reach a node when dialing the given address.
pub fn set_unreachable(&mut self, addr: &Multiaddr) {
for info in self.store.values_mut() {
for a in info.addrs.iter_mut() {
if &a.addr != addr {
continue
}
debug_assert!(!a.is_connected());
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off);
a.back_off_until = Instant::now() + a.next_back_off;
-1
View File
@@ -21,7 +21,6 @@ mod custom_proto;
mod error;
mod secret;
mod service_task;
mod topology;
mod traits;
mod transport;
@@ -19,7 +19,6 @@ use crate::{
transport
};
use crate::custom_proto::{RegisteredProtocol, RegisteredProtocols};
use crate::topology::NetTopology;
use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
use bytes::Bytes;
use fnv::FnvHashMap;
@@ -37,9 +36,6 @@ use std::sync::Arc;
use std::time::Duration;
use tokio_timer::Interval;
// File where the network topology is stored.
const NODES_FILE: &str = "nodes.json";
/// Starts the substrate libp2p service.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
@@ -58,25 +54,12 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
let local_public_key = local_private_key.to_public_key();
let local_peer_id = local_public_key.clone().into_peer_id();
// Initialize the topology of the network.
let mut topology = if let Some(ref path) = config.net_config_path {
let path = Path::new(path).join(NODES_FILE);
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
NetTopology::from_file(local_public_key, path)
} else {
debug!(target: "sub-libp2p", "No peers file configured ; peers won't be saved");
NetTopology::memory(local_public_key)
};
// Register the external addresses provided by the user as our own.
topology.add_external_addrs(config.public_addresses.clone().into_iter());
// Build the swarm.
let (mut swarm, bandwidth) = {
let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect());
let behaviour = Behaviour::new(&config, local_peer_id.clone(), registered_custom);
let behaviour = Behaviour::new(&config, local_public_key.clone(), registered_custom);
let (transport, bandwidth) = transport::build_transport(local_private_key);
(Swarm::new(transport, behaviour, topology), bandwidth)
(Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth)
};
// Listen on multiaddresses.
@@ -90,11 +73,15 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
}
}
// Add the bootstrap nodes to the topology and connect to them.
// Add external addresses.
for addr in &config.public_addresses {
Swarm::add_external_address(&mut swarm, addr.clone());
}
// Connect to the bootnodes.
for bootnode in config.boot_nodes.iter() {
match parse_str_addr(bootnode) {
Ok((peer_id, addr)) => {
Swarm::topology_mut(&mut swarm).add_bootstrap_addr(&peer_id, addr.clone());
Ok((peer_id, _)) => {
Swarm::dial(&mut swarm, peer_id);
},
Err(_) => {
@@ -121,8 +108,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
// Initialize the reserved peers.
for reserved in config.reserved_nodes.iter() {
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
Swarm::topology_mut(&mut swarm).add_bootstrap_addr(&peer_id, addr);
swarm.add_reserved_peer(peer_id.clone());
swarm.add_reserved_peer(peer_id.clone(), addr);
Swarm::dial(&mut swarm, peer_id);
} else {
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
@@ -130,7 +116,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
}
debug!(target: "sub-libp2p", "Topology started with {} entries",
Swarm::topology_mut(&mut swarm).num_peers());
swarm.num_topology_peers());
Ok(Service {
swarm,
@@ -204,7 +190,7 @@ pub enum ServiceEvent {
/// Network service. Must be polled regularly in order for the networking to work.
pub struct Service {
/// Stream of events of the swarm.
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>, NetTopology>,
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>>,
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
bandwidth: Arc<transport::BandwidthSinks>,
@@ -270,8 +256,7 @@ impl Service {
/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
Swarm::topology_mut(&mut self.swarm).add_bootstrap_addr(&peer_id, addr);
self.swarm.add_reserved_peer(peer_id);
self.swarm.add_reserved_peer(peer_id, addr);
}
/// Try to remove a reserved peer.
@@ -452,13 +437,13 @@ impl Service {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(_))) => {
debug!(target: "sub-libp2p", "Cleaning and flushing topology");
Swarm::topology_mut(&mut self.swarm).cleanup();
if let Err(err) = Swarm::topology_mut(&mut self.swarm).flush_to_disk() {
self.swarm.cleanup();
if let Err(err) = self.swarm.flush_topology() {
warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
}
debug!(target: "sub-libp2p", "Topology now contains {} nodes",
Swarm::topology_mut(&mut self.swarm).num_peers());
},
self.swarm.num_topology_peers());
}
Ok(Async::Ready(None)) => {
warn!(target: "sub-libp2p", "Topology flush stream ended unexpectedly");
return Ok(Async::Ready(None))
@@ -474,7 +459,7 @@ impl Service {
impl Drop for Service {
fn drop(&mut self) {
if let Err(err) = Swarm::topology_mut(&mut self.swarm).flush_to_disk() {
if let Err(err) = self.swarm.flush_topology() {
warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
}
}