mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 16:51:03 +00:00
Keep node information about disconnection (#2596)
* Keep node information about disconnection * Fix line widths
This commit is contained in:
committed by
Gavin Wood
parent
2893a613c2
commit
c4e3970d9f
@@ -22,37 +22,36 @@ use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActi
|
|||||||
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
|
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
use libp2p::core::swarm::toggle::Toggle;
|
use libp2p::core::swarm::toggle::Toggle;
|
||||||
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
|
|
||||||
use libp2p::kad::{Kademlia, KademliaOut};
|
use libp2p::kad::{Kademlia, KademliaOut};
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
use libp2p::mdns::{Mdns, MdnsEvent};
|
use libp2p::mdns::{Mdns, MdnsEvent};
|
||||||
use libp2p::multiaddr::Protocol;
|
use libp2p::multiaddr::Protocol;
|
||||||
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
|
|
||||||
use log::{debug, info, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use std::{cmp, iter, time::Duration};
|
use std::{cmp, iter, time::Duration};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_timer::{Delay, clock::Clock};
|
use tokio_timer::{Delay, clock::Clock};
|
||||||
use void;
|
use void;
|
||||||
|
|
||||||
|
mod debug_info;
|
||||||
|
|
||||||
/// General behaviour of the network.
|
/// General behaviour of the network.
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
#[behaviour(out_event = "BehaviourOut<TBehaviourEv>", poll_method = "poll")]
|
#[behaviour(out_event = "TBehaviourEv", poll_method = "poll")]
|
||||||
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||||
/// Main protocol that handles everything except the discovery and the technicalities.
|
/// Main protocol that handles everything except the discovery and the technicalities.
|
||||||
user_protocol: UserBehaviourWrap<TBehaviour>,
|
user_protocol: UserBehaviourWrap<TBehaviour>,
|
||||||
/// Periodically ping nodes, and close the connection if it's unresponsive.
|
/// Periodically pings and identifies the nodes we are connected to, and store information in a
|
||||||
ping: Ping<TSubstream>,
|
/// cache.
|
||||||
|
debug_info: debug_info::DebugInfoBehaviour<TSubstream>,
|
||||||
/// Discovers nodes of the network. Defined below.
|
/// Discovers nodes of the network. Defined below.
|
||||||
discovery: DiscoveryBehaviour<TSubstream>,
|
discovery: DiscoveryBehaviour<TSubstream>,
|
||||||
/// Periodically identifies the remote and responds to incoming requests.
|
|
||||||
identify: Identify<TSubstream>,
|
|
||||||
/// Discovers nodes on the local network.
|
/// Discovers nodes on the local network.
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
mdns: Toggle<Mdns<TSubstream>>,
|
mdns: Toggle<Mdns<TSubstream>>,
|
||||||
|
|
||||||
/// Queue of events to produce for the outside.
|
/// Queue of events to produce for the outside.
|
||||||
#[behaviour(ignore)]
|
#[behaviour(ignore)]
|
||||||
events: Vec<BehaviourOut<TBehaviourEv>>,
|
events: Vec<TBehaviourEv>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||||
@@ -64,10 +63,7 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
|||||||
known_addresses: Vec<(PeerId, Multiaddr)>,
|
known_addresses: Vec<(PeerId, Multiaddr)>,
|
||||||
enable_mdns: bool,
|
enable_mdns: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let identify = {
|
let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone());
|
||||||
let proto_version = "/substrate/1.0".to_string();
|
|
||||||
Identify::new(proto_version, user_agent, local_public_key.clone())
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
|
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
|
||||||
for (peer_id, addr) in &known_addresses {
|
for (peer_id, addr) in &known_addresses {
|
||||||
@@ -82,7 +78,7 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
|||||||
let clock = Clock::new();
|
let clock = Clock::new();
|
||||||
Behaviour {
|
Behaviour {
|
||||||
user_protocol: UserBehaviourWrap(user_protocol),
|
user_protocol: UserBehaviourWrap(user_protocol),
|
||||||
ping: Ping::new(PingConfig::new()),
|
debug_info,
|
||||||
discovery: DiscoveryBehaviour {
|
discovery: DiscoveryBehaviour {
|
||||||
user_defined: known_addresses,
|
user_defined: known_addresses,
|
||||||
kademlia,
|
kademlia,
|
||||||
@@ -91,7 +87,6 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
|||||||
clock,
|
clock,
|
||||||
local_peer_id: local_public_key.into_peer_id(),
|
local_peer_id: local_public_key.into_peer_id(),
|
||||||
},
|
},
|
||||||
identify,
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
mdns: if enable_mdns {
|
mdns: if enable_mdns {
|
||||||
match Mdns::new() {
|
match Mdns::new() {
|
||||||
@@ -120,6 +115,15 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Borrows `self` and returns a struct giving access to the information about a node.
|
||||||
|
///
|
||||||
|
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
|
||||||
|
/// we're connected to, meaning that if `None` is returned then we're not connected to that
|
||||||
|
/// node.
|
||||||
|
pub fn node(&self, peer_id: &PeerId) -> Option<debug_info::Node> {
|
||||||
|
self.debug_info.node(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a shared reference to the user protocol.
|
/// Returns a shared reference to the user protocol.
|
||||||
pub fn user_protocol(&self) -> &TBehaviour {
|
pub fn user_protocol(&self) -> &TBehaviour {
|
||||||
&self.user_protocol.0
|
&self.user_protocol.0
|
||||||
@@ -131,73 +135,39 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can be emitted by the behaviour.
|
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for
|
||||||
#[derive(Debug)]
|
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||||
pub enum BehaviourOut<TBehaviourEv> {
|
|
||||||
/// Message from the user protocol.
|
|
||||||
UserProtocol(TBehaviourEv),
|
|
||||||
|
|
||||||
/// We have obtained debug information from a peer.
|
|
||||||
Identified {
|
|
||||||
/// Id of the peer that has been identified.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// Information about the peer.
|
|
||||||
info: IdentifyInfo,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// We have successfully pinged a peer.
|
|
||||||
PingSuccess {
|
|
||||||
/// Id of the peer that has been pinged.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// Time it took for the ping to come back.
|
|
||||||
ping_time: Duration,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
|
||||||
fn inject_event(&mut self, event: void::Void) {
|
fn inject_event(&mut self, event: void::Void) {
|
||||||
void::unreachable(event)
|
void::unreachable(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for
|
||||||
|
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||||
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
|
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
|
||||||
self.events.push(BehaviourOut::UserProtocol(event.0));
|
self.events.push(event.0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent>
|
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
|
||||||
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
||||||
where TBehaviour: DiscoveryNetBehaviour {
|
where TBehaviour: DiscoveryNetBehaviour {
|
||||||
fn inject_event(&mut self, event: IdentifyEvent) {
|
fn inject_event(&mut self, event: debug_info::DebugInfoEvent) {
|
||||||
match event {
|
let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event;
|
||||||
IdentifyEvent::Identified { peer_id, mut info, .. } => {
|
if !info.protocol_version.contains("substrate") {
|
||||||
trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
|
warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info);
|
||||||
// TODO: ideally we would delay the first identification to when we open the custom
|
|
||||||
// protocol, so that we only report id info to the service about the nodes we
|
|
||||||
// care about (https://github.com/libp2p/rust-libp2p/issues/876)
|
|
||||||
if !info.protocol_version.contains("substrate") {
|
|
||||||
warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info);
|
|
||||||
}
|
|
||||||
if info.listen_addrs.len() > 30 {
|
|
||||||
warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
|
|
||||||
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
|
|
||||||
info.agent_version
|
|
||||||
);
|
|
||||||
info.listen_addrs.truncate(30);
|
|
||||||
}
|
|
||||||
for addr in &info.listen_addrs {
|
|
||||||
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
|
|
||||||
}
|
|
||||||
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
|
|
||||||
self.events.push(BehaviourOut::Identified { peer_id, info });
|
|
||||||
}
|
|
||||||
IdentifyEvent::Error { .. } => {}
|
|
||||||
IdentifyEvent::SendBack { result: Err(ref err), ref peer_id } =>
|
|
||||||
debug!(target: "sub-libp2p", "Error when sending back identify info \
|
|
||||||
to {:?} => {}", peer_id, err),
|
|
||||||
IdentifyEvent::SendBack { .. } => {}
|
|
||||||
}
|
}
|
||||||
|
if info.listen_addrs.len() > 30 {
|
||||||
|
warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
|
||||||
|
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
|
||||||
|
info.agent_version
|
||||||
|
);
|
||||||
|
info.listen_addrs.truncate(30);
|
||||||
|
}
|
||||||
|
for addr in &info.listen_addrs {
|
||||||
|
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
|
||||||
|
}
|
||||||
|
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -224,18 +194,6 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<Kademlia
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
|
||||||
fn inject_event(&mut self, event: PingEvent) {
|
|
||||||
match event {
|
|
||||||
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
|
|
||||||
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, rtt);
|
|
||||||
self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: rtt });
|
|
||||||
}
|
|
||||||
_ => ()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
|
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
|
||||||
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
||||||
@@ -251,7 +209,7 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEven
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||||
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TBehaviourEv>>> {
|
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, TBehaviourEv>> {
|
||||||
if !self.events.is_empty() {
|
if !self.events.is_empty() {
|
||||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
|
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
|
||||||
}
|
}
|
||||||
@@ -279,16 +237,21 @@ impl<TInner: NetworkBehaviour> NetworkBehaviour for UserBehaviourWrap<TInner> {
|
|||||||
self.0.inject_disconnected(peer_id, endpoint)
|
self.0.inject_disconnected(peer_id, endpoint)
|
||||||
}
|
}
|
||||||
fn inject_node_event(
|
fn inject_node_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
||||||
) {
|
) {
|
||||||
self.0.inject_node_event(peer_id, event)
|
self.0.inject_node_event(peer_id, event)
|
||||||
}
|
}
|
||||||
fn poll(
|
fn poll(
|
||||||
&mut self,
|
&mut self,
|
||||||
params: &mut PollParameters
|
params: &mut PollParameters
|
||||||
) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
|
) -> Async<
|
||||||
|
NetworkBehaviourAction<
|
||||||
|
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
|
||||||
|
Self::OutEvent
|
||||||
|
>
|
||||||
|
> {
|
||||||
match self.0.poll(params) {
|
match self.0.poll(params) {
|
||||||
Async::NotReady => Async::NotReady,
|
Async::NotReady => Async::NotReady,
|
||||||
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
|
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
|
||||||
|
|||||||
@@ -0,0 +1,329 @@
|
|||||||
|
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Substrate.
|
||||||
|
|
||||||
|
// Substrate is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Substrate is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use fnv::FnvHashMap;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p::Multiaddr;
|
||||||
|
use libp2p::core::{either::EitherOutput, PeerId, PublicKey};
|
||||||
|
use libp2p::core::protocols_handler::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
|
||||||
|
use libp2p::core::nodes::ConnectedPoint;
|
||||||
|
use libp2p::core::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||||
|
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
|
||||||
|
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
|
||||||
|
use log::{debug, trace, error};
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_timer::Interval;
|
||||||
|
|
||||||
|
/// Time after we disconnect from a node before we purge its information from the cache.
|
||||||
|
const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
|
||||||
|
/// Interval at which we perform garbage collection on the node info.
|
||||||
|
const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
|
||||||
|
|
||||||
|
/// Implementation of `NetworkBehaviour` that holds information about nodes in cache for diagnostic
|
||||||
|
/// purposes.
|
||||||
|
pub struct DebugInfoBehaviour<TSubstream> {
|
||||||
|
/// Periodically ping nodes, and close the connection if it's unresponsive.
|
||||||
|
ping: Ping<TSubstream>,
|
||||||
|
/// Periodically identifies the remote and responds to incoming requests.
|
||||||
|
identify: Identify<TSubstream>,
|
||||||
|
/// Information that we know about all nodes.
|
||||||
|
nodes_info: FnvHashMap<PeerId, NodeInfo>,
|
||||||
|
/// Interval at which we perform garbage collection in `nodes_info`.
|
||||||
|
garbage_collect: Interval,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Information about a node we're connected to.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct NodeInfo {
|
||||||
|
/// When we will remove the entry about this node from the list, or `None` if we're connected
|
||||||
|
/// to the node.
|
||||||
|
info_expire: Option<Instant>,
|
||||||
|
/// How we're connected to the node.
|
||||||
|
endpoint: ConnectedPoint,
|
||||||
|
/// Version reported by the remote, or `None` if unknown.
|
||||||
|
client_version: Option<String>,
|
||||||
|
/// Latest ping time with this node.
|
||||||
|
latest_ping: Option<Duration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream> DebugInfoBehaviour<TSubstream> {
|
||||||
|
/// Builds a new `DebugInfoBehaviour`.
|
||||||
|
pub fn new(
|
||||||
|
user_agent: String,
|
||||||
|
local_public_key: PublicKey,
|
||||||
|
) -> Self {
|
||||||
|
let identify = {
|
||||||
|
let proto_version = "/substrate/1.0".to_string();
|
||||||
|
Identify::new(proto_version, user_agent, local_public_key.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
DebugInfoBehaviour {
|
||||||
|
ping: Ping::new(PingConfig::new()),
|
||||||
|
identify,
|
||||||
|
nodes_info: FnvHashMap::default(),
|
||||||
|
garbage_collect: Interval::new_interval(GARBAGE_COLLECT_INTERVAL),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Borrows `self` and returns a struct giving access to the information about a node.
|
||||||
|
///
|
||||||
|
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
|
||||||
|
/// we're connected to, meaning that if `None` is returned then we're not connected to that
|
||||||
|
/// node.
|
||||||
|
pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
|
||||||
|
self.nodes_info.get(peer_id).map(Node)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node,
|
||||||
|
/// which shouldn't happen.
|
||||||
|
fn handle_ping_report(&mut self, peer_id: &PeerId, ping_time: Duration) {
|
||||||
|
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer_id, ping_time);
|
||||||
|
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
|
||||||
|
entry.latest_ping = Some(ping_time);
|
||||||
|
} else {
|
||||||
|
error!(target: "sub-libp2p",
|
||||||
|
"Received ping from node we're not connected to {:?}", peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inserts an identify record in the cache. Has no effect if we don't have any entry for that
|
||||||
|
/// node, which shouldn't happen.
|
||||||
|
fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
|
||||||
|
trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
|
||||||
|
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
|
||||||
|
entry.client_version = Some(info.agent_version.clone());
|
||||||
|
} else {
|
||||||
|
error!(target: "sub-libp2p",
|
||||||
|
"Received pong from node we're not connected to {:?}", peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to the information about a node.
|
||||||
|
pub struct Node<'a>(&'a NodeInfo);
|
||||||
|
|
||||||
|
impl<'a> Node<'a> {
|
||||||
|
/// Returns the endpoint we are connected to or were last connected to.
|
||||||
|
pub fn endpoint(&self) -> &'a ConnectedPoint {
|
||||||
|
&self.0.endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the latest version information we know of.
|
||||||
|
pub fn client_version(&self) -> Option<&'a str> {
|
||||||
|
self.0.client_version.as_ref().map(|s| &s[..])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the latest ping time we know of for this node. `None` if we never successfully
|
||||||
|
/// pinged this node.
|
||||||
|
pub fn latest_ping(&self) -> Option<Duration> {
|
||||||
|
self.0.latest_ping
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates an arbitrary string containing debug information about the node.
|
||||||
|
pub fn debug_info(&self) -> String {
|
||||||
|
format!("(version: {:?}) through {:?}", self.0.client_version, self.0.endpoint)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event that can be emitted by the behaviour.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DebugInfoEvent {
|
||||||
|
/// We have obtained debug information from a peer, including the addresses it is listening
|
||||||
|
/// on.
|
||||||
|
Identified {
|
||||||
|
/// Id of the peer that has been identified.
|
||||||
|
peer_id: PeerId,
|
||||||
|
/// Information about the peer.
|
||||||
|
info: IdentifyInfo,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream> NetworkBehaviour for DebugInfoBehaviour<TSubstream>
|
||||||
|
where TSubstream: AsyncRead + AsyncWrite {
|
||||||
|
type ProtocolsHandler = IntoProtocolsHandlerSelect<
|
||||||
|
<Ping<TSubstream> as NetworkBehaviour>::ProtocolsHandler,
|
||||||
|
<Identify<TSubstream> as NetworkBehaviour>::ProtocolsHandler
|
||||||
|
>;
|
||||||
|
type OutEvent = DebugInfoEvent;
|
||||||
|
|
||||||
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
|
IntoProtocolsHandler::select(self.ping.new_handler(), self.identify.new_handler())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
|
let mut list = self.ping.addresses_of_peer(peer_id);
|
||||||
|
list.extend_from_slice(&self.identify.addresses_of_peer(peer_id));
|
||||||
|
list
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
|
||||||
|
self.ping.inject_connected(peer_id.clone(), endpoint.clone());
|
||||||
|
self.identify.inject_connected(peer_id.clone(), endpoint.clone());
|
||||||
|
|
||||||
|
match self.nodes_info.entry(peer_id) {
|
||||||
|
Entry::Vacant(e) => {
|
||||||
|
e.insert(NodeInfo {
|
||||||
|
info_expire: None,
|
||||||
|
endpoint,
|
||||||
|
client_version: None,
|
||||||
|
latest_ping: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Entry::Occupied(e) => {
|
||||||
|
let e = e.into_mut();
|
||||||
|
if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false) {
|
||||||
|
e.client_version = None;
|
||||||
|
e.latest_ping = None;
|
||||||
|
}
|
||||||
|
e.info_expire = None;
|
||||||
|
e.endpoint = endpoint;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
|
||||||
|
self.ping.inject_disconnected(peer_id, endpoint.clone());
|
||||||
|
self.identify.inject_disconnected(peer_id, endpoint);
|
||||||
|
|
||||||
|
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
|
||||||
|
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
|
||||||
|
} else {
|
||||||
|
error!(target: "sub-libp2p",
|
||||||
|
"Disconnected from node we were not connected to {:?}", peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_node_event(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
EitherOutput::First(event) => self.ping.inject_node_event(peer_id, event),
|
||||||
|
EitherOutput::Second(event) => self.identify.inject_node_event(peer_id, event),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||||
|
self.ping.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
|
||||||
|
self.identify.inject_replaced(peer_id.clone(), closed_endpoint, new_endpoint.clone());
|
||||||
|
|
||||||
|
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
|
||||||
|
entry.endpoint = new_endpoint;
|
||||||
|
} else {
|
||||||
|
error!(target: "sub-libp2p",
|
||||||
|
"Disconnected from node we were not connected to {:?}", peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
|
||||||
|
self.ping.inject_addr_reach_failure(peer_id, addr, error);
|
||||||
|
self.identify.inject_addr_reach_failure(peer_id, addr, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||||
|
self.ping.inject_dial_failure(peer_id);
|
||||||
|
self.identify.inject_dial_failure(peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.ping.inject_new_listen_addr(addr);
|
||||||
|
self.identify.inject_new_listen_addr(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.ping.inject_expired_listen_addr(addr);
|
||||||
|
self.identify.inject_expired_listen_addr(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
self.ping.inject_new_external_addr(addr);
|
||||||
|
self.identify.inject_new_external_addr(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
params: &mut PollParameters
|
||||||
|
) -> Async<
|
||||||
|
NetworkBehaviourAction<
|
||||||
|
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
|
||||||
|
Self::OutEvent
|
||||||
|
>
|
||||||
|
> {
|
||||||
|
loop {
|
||||||
|
match self.ping.poll(params) {
|
||||||
|
Async::NotReady => break,
|
||||||
|
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
|
||||||
|
if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev {
|
||||||
|
self.handle_ping_report(&peer, rtt)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::SendEvent {
|
||||||
|
peer_id,
|
||||||
|
event: EitherOutput::First(event)
|
||||||
|
}),
|
||||||
|
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.identify.poll(params) {
|
||||||
|
Async::NotReady => break,
|
||||||
|
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
|
||||||
|
match event {
|
||||||
|
IdentifyEvent::Identified { peer_id, info, .. } => {
|
||||||
|
self.handle_identify_report(&peer_id, &info);
|
||||||
|
let event = DebugInfoEvent::Identified { peer_id, info };
|
||||||
|
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||||
|
}
|
||||||
|
IdentifyEvent::Error { .. } => {}
|
||||||
|
IdentifyEvent::SendBack { result: Err(ref err), ref peer_id } =>
|
||||||
|
debug!(target: "sub-libp2p", "Error when sending back identify info \
|
||||||
|
to {:?} => {}", peer_id, err),
|
||||||
|
IdentifyEvent::SendBack { .. } => {}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||||
|
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::SendEvent {
|
||||||
|
peer_id,
|
||||||
|
event: EitherOutput::Second(event)
|
||||||
|
}),
|
||||||
|
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||||
|
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Ok(Async::Ready(Some(_))) = self.garbage_collect.poll() {
|
||||||
|
self.nodes_info.retain(|_, node| {
|
||||||
|
node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Async::NotReady
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -154,6 +154,22 @@ enum PeerState {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PeerState {
|
||||||
|
/// True if we have an open channel with that node.
|
||||||
|
fn is_open(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
PeerState::Poisoned => false,
|
||||||
|
PeerState::Banned { .. } => false,
|
||||||
|
PeerState::PendingRequest { .. } => false,
|
||||||
|
PeerState::Requested => false,
|
||||||
|
PeerState::Disabled { open, .. } => *open,
|
||||||
|
PeerState::DisabledPendingEnable { open, .. } => *open,
|
||||||
|
PeerState::Enabled { open, .. } => *open,
|
||||||
|
PeerState::Incoming { .. } => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// State of an "incoming" message sent to the peer set manager.
|
/// State of an "incoming" message sent to the peer set manager.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct IncomingPeer {
|
struct IncomingPeer {
|
||||||
@@ -223,6 +239,16 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the list of all the peers we have an open channel to.
|
||||||
|
pub fn open_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
|
||||||
|
self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if we have a channel open with this node.
|
||||||
|
pub fn is_open(&self, peer_id: &PeerId) -> bool {
|
||||||
|
self.peers.get(peer_id).map(|p| p.is_open()).unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
/// Disconnects the given peer if we are connected to it.
|
/// Disconnects the given peer if we are connected to it.
|
||||||
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
|
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
|
||||||
debug!(target: "sub-libp2p", "External API => Disconnect {:?}", peer_id);
|
debug!(target: "sub-libp2p", "External API => Disconnect {:?}", peer_id);
|
||||||
@@ -313,21 +339,6 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if we have opened a protocol with the given peer.
|
|
||||||
pub fn is_open(&self, peer_id: &PeerId) -> bool {
|
|
||||||
match self.peers.get(peer_id) {
|
|
||||||
None => false,
|
|
||||||
Some(PeerState::Disabled { open, .. }) => *open,
|
|
||||||
Some(PeerState::DisabledPendingEnable { open, .. }) => *open,
|
|
||||||
Some(PeerState::Enabled { open, .. }) => *open,
|
|
||||||
Some(PeerState::Incoming { .. }) => false,
|
|
||||||
Some(PeerState::Requested) => false,
|
|
||||||
Some(PeerState::PendingRequest { .. }) => false,
|
|
||||||
Some(PeerState::Banned { .. }) => false,
|
|
||||||
Some(PeerState::Poisoned) => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends a message to a peer.
|
/// Sends a message to a peer.
|
||||||
///
|
///
|
||||||
/// Has no effect if the custom protocol is not open with the given peer.
|
/// Has no effect if the custom protocol is not open with the given peer.
|
||||||
|
|||||||
@@ -247,6 +247,10 @@ pub struct NetworkStatePeer {
|
|||||||
pub struct NetworkStateNotConnectedPeer {
|
pub struct NetworkStateNotConnectedPeer {
|
||||||
/// List of addresses known for this node.
|
/// List of addresses known for this node.
|
||||||
pub known_addresses: HashSet<Multiaddr>,
|
pub known_addresses: HashSet<Multiaddr>,
|
||||||
|
/// Node information, as provided by the node itself, if we were ever connected to this node.
|
||||||
|
pub version_string: Option<String>,
|
||||||
|
/// Latest ping duration with this node, if we were ever connected to this node.
|
||||||
|
pub latest_ping_time: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||||
@@ -270,8 +274,8 @@ impl From<ConnectedPoint> for NetworkStatePeerEndpoint {
|
|||||||
NetworkStatePeerEndpoint::Dialing(address),
|
NetworkStatePeerEndpoint::Dialing(address),
|
||||||
ConnectedPoint::Listener { listen_addr, send_back_addr } =>
|
ConnectedPoint::Listener { listen_addr, send_back_addr } =>
|
||||||
NetworkStatePeerEndpoint::Listening {
|
NetworkStatePeerEndpoint::Listening {
|
||||||
listen_addr: listen_addr,
|
listen_addr,
|
||||||
send_back_addr: send_back_addr
|
send_back_addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,22 +15,20 @@
|
|||||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
behaviour::Behaviour, behaviour::BehaviourOut,
|
behaviour::Behaviour,
|
||||||
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
|
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
|
||||||
};
|
};
|
||||||
use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol};
|
use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol};
|
||||||
use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr};
|
use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr};
|
||||||
use fnv::FnvHashMap;
|
|
||||||
use futures::{prelude::*, Stream};
|
use futures::{prelude::*, Stream};
|
||||||
use libp2p::{Multiaddr, core::swarm::NetworkBehaviour, PeerId};
|
use libp2p::{Multiaddr, core::swarm::NetworkBehaviour, PeerId};
|
||||||
use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||||
use libp2p::core::nodes::ConnectedPoint;
|
use libp2p::core::nodes::ConnectedPoint;
|
||||||
use log::{debug, info, warn};
|
use log::{info, error, warn};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
/// Starts the substrate libp2p service.
|
/// Starts the substrate libp2p service.
|
||||||
///
|
///
|
||||||
@@ -110,7 +108,6 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
let service = Service {
|
let service = Service {
|
||||||
swarm,
|
swarm,
|
||||||
bandwidth,
|
bandwidth,
|
||||||
nodes_info: Default::default(),
|
|
||||||
injected_events: Vec::new(),
|
injected_events: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -158,57 +155,57 @@ pub enum ServiceEvent<TMessage> {
|
|||||||
/// Network service. Must be polled regularly in order for the networking to work.
|
/// Network service. Must be polled regularly in order for the networking to work.
|
||||||
pub struct Service<TMessage> where TMessage: CustomMessage {
|
pub struct Service<TMessage> where TMessage: CustomMessage {
|
||||||
/// Stream of events of the swarm.
|
/// Stream of events of the swarm.
|
||||||
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<CustomProto<TMessage, Substream<StreamMuxerBox>>, CustomProtoOut<TMessage>, Substream<StreamMuxerBox>>>,
|
swarm: Swarm<
|
||||||
|
Boxed<(PeerId, StreamMuxerBox), IoError>,
|
||||||
|
Behaviour<CustomProto<TMessage, Substream<StreamMuxerBox>>, CustomProtoOut<TMessage>, Substream<StreamMuxerBox>>
|
||||||
|
>,
|
||||||
|
|
||||||
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
||||||
bandwidth: Arc<transport::BandwidthSinks>,
|
bandwidth: Arc<transport::BandwidthSinks>,
|
||||||
|
|
||||||
/// Information about all the nodes we're connected to.
|
|
||||||
nodes_info: FnvHashMap<PeerId, NodeInfo>,
|
|
||||||
|
|
||||||
/// Events to produce on the Stream.
|
/// Events to produce on the Stream.
|
||||||
injected_events: Vec<ServiceEvent<TMessage>>,
|
injected_events: Vec<ServiceEvent<TMessage>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Information about a node we're connected to.
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct NodeInfo {
|
|
||||||
/// How we're connected to the node.
|
|
||||||
endpoint: ConnectedPoint,
|
|
||||||
/// Version reported by the remote, or `None` if unknown.
|
|
||||||
client_version: Option<String>,
|
|
||||||
/// Latest ping time with this node.
|
|
||||||
latest_ping: Option<Duration>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TMessage> Service<TMessage>
|
impl<TMessage> Service<TMessage>
|
||||||
where TMessage: CustomMessage + Send + 'static {
|
where TMessage: CustomMessage + Send + 'static {
|
||||||
/// Returns a struct containing tons of useful information about the network.
|
/// Returns a struct containing tons of useful information about the network.
|
||||||
pub fn state(&mut self) -> NetworkState {
|
pub fn state(&mut self) -> NetworkState {
|
||||||
|
let open = self.swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>();
|
||||||
|
|
||||||
let connected_peers = {
|
let connected_peers = {
|
||||||
let swarm = &mut self.swarm;
|
let swarm = &mut self.swarm;
|
||||||
self.nodes_info.iter().map(move |(peer_id, info)| {
|
open.iter().filter_map(move |peer_id| {
|
||||||
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
|
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
|
||||||
.into_iter().collect();
|
.into_iter().collect();
|
||||||
|
|
||||||
(peer_id.to_base58(), NetworkStatePeer {
|
let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) {
|
||||||
endpoint: info.endpoint.clone().into(),
|
e.clone().into()
|
||||||
version_string: info.client_version.clone(),
|
} else {
|
||||||
latest_ping_time: info.latest_ping,
|
error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
|
||||||
|
and debug information about {:?}", peer_id);
|
||||||
|
return None
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((peer_id.to_base58(), NetworkStatePeer {
|
||||||
|
endpoint,
|
||||||
|
version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
||||||
|
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
|
||||||
enabled: swarm.user_protocol().is_enabled(&peer_id),
|
enabled: swarm.user_protocol().is_enabled(&peer_id),
|
||||||
open: swarm.user_protocol().is_open(&peer_id),
|
open: swarm.user_protocol().is_open(&peer_id),
|
||||||
known_addresses,
|
known_addresses,
|
||||||
})
|
}))
|
||||||
}).collect()
|
}).collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
let not_connected_peers = {
|
let not_connected_peers = {
|
||||||
let swarm = &mut self.swarm;
|
let swarm = &mut self.swarm;
|
||||||
let nodes_info = &self.nodes_info;
|
let list = swarm.known_peers().filter(|p| !open.iter().all(|n| n != *p))
|
||||||
let list = swarm.known_peers().filter(|p| !nodes_info.contains_key(p))
|
|
||||||
.cloned().collect::<Vec<_>>();
|
.cloned().collect::<Vec<_>>();
|
||||||
list.into_iter().map(move |peer_id| {
|
list.into_iter().map(move |peer_id| {
|
||||||
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
|
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
|
||||||
|
version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
|
||||||
|
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
|
||||||
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
|
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
|
||||||
.into_iter().collect(),
|
.into_iter().collect(),
|
||||||
})
|
})
|
||||||
@@ -246,27 +243,28 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the peer id of the local node.
|
/// Returns the peer id of the local node.
|
||||||
#[inline]
|
|
||||||
pub fn peer_id(&self) -> &PeerId {
|
pub fn peer_id(&self) -> &PeerId {
|
||||||
Swarm::local_peer_id(&self.swarm)
|
Swarm::local_peer_id(&self.swarm)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the list of all the peers we are connected to.
|
/// Returns the list of all the peers we are connected to.
|
||||||
#[inline]
|
|
||||||
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
|
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
|
||||||
self.nodes_info.keys()
|
self.swarm.user_protocol().open_peers()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the way we are connected to a node.
|
/// Returns the way we are connected to a node. Returns `None` if we are not connected to it.
|
||||||
#[inline]
|
|
||||||
pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> {
|
pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> {
|
||||||
self.nodes_info.get(peer_id).map(|info| &info.endpoint)
|
if self.swarm.user_protocol().is_open(peer_id) {
|
||||||
|
self.swarm.node(peer_id).map(|n| n.endpoint())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the client version reported by a node.
|
/// Returns the latest client version reported by a node. Can return `Some` even for nodes
|
||||||
|
/// we're not connected to.
|
||||||
pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> {
|
pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> {
|
||||||
self.nodes_info.get(peer_id)
|
self.swarm.node(peer_id).and_then(|n| n.client_version())
|
||||||
.and_then(|info| info.client_version.as_ref().map(|s| &s[..]))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a message to a peer using the custom protocol.
|
/// Sends a message to a peer using the custom protocol.
|
||||||
@@ -286,11 +284,7 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
/// This is asynchronous and will not immediately close the peer.
|
/// This is asynchronous and will not immediately close the peer.
|
||||||
/// Corresponding closing events will be generated once the closing actually happens.
|
/// Corresponding closing events will be generated once the closing actually happens.
|
||||||
pub fn drop_node(&mut self, peer_id: &PeerId) {
|
pub fn drop_node(&mut self, peer_id: &PeerId) {
|
||||||
if let Some(info) = self.nodes_info.get(peer_id) {
|
self.swarm.user_protocol_mut().disconnect_peer(peer_id);
|
||||||
debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})",
|
|
||||||
peer_id, info.endpoint, info.client_version);
|
|
||||||
self.swarm.user_protocol_mut().disconnect_peer(peer_id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a hard-coded address for the given peer, that never expires.
|
/// Adds a hard-coded address for the given peer, that never expires.
|
||||||
@@ -300,10 +294,10 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
|
|
||||||
/// Get debug info for a given peer.
|
/// Get debug info for a given peer.
|
||||||
pub fn peer_debug_info(&self, who: &PeerId) -> String {
|
pub fn peer_debug_info(&self, who: &PeerId) -> String {
|
||||||
if let Some(info) = self.nodes_info.get(who) {
|
if let Some(node) = self.swarm.node(who) {
|
||||||
format!("{:?} (version: {:?}) through {:?}", who, info.client_version, info.endpoint)
|
format!("{:?} {}", who, node.debug_info())
|
||||||
} else {
|
} else {
|
||||||
"unknown".to_string()
|
format!("{:?} (unknown)", who)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,12 +305,7 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
|
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
|
||||||
loop {
|
loop {
|
||||||
match self.swarm.poll() {
|
match self.swarm.poll() {
|
||||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolOpen { peer_id, version, endpoint })))) => {
|
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => {
|
||||||
self.nodes_info.insert(peer_id.clone(), NodeInfo {
|
|
||||||
endpoint,
|
|
||||||
client_version: None,
|
|
||||||
latest_ping: None,
|
|
||||||
});
|
|
||||||
let debug_info = self.peer_debug_info(&peer_id);
|
let debug_info = self.peer_debug_info(&peer_id);
|
||||||
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
|
break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol {
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -324,42 +313,25 @@ where TMessage: CustomMessage + Send + 'static {
|
|||||||
debug_info,
|
debug_info,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolClosed { peer_id, .. })))) => {
|
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => {
|
||||||
let debug_info = self.peer_debug_info(&peer_id);
|
let debug_info = self.peer_debug_info(&peer_id);
|
||||||
self.nodes_info.remove(&peer_id);
|
|
||||||
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
|
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
|
||||||
peer_id,
|
peer_id,
|
||||||
debug_info,
|
debug_info,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomMessage { peer_id, message })))) => {
|
Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) => {
|
||||||
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
|
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
|
||||||
peer_id,
|
peer_id,
|
||||||
message,
|
message,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::Clogged { peer_id, messages })))) => {
|
Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages }))) => {
|
||||||
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
|
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
|
||||||
peer_id,
|
peer_id,
|
||||||
messages,
|
messages,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => {
|
|
||||||
// Contrary to the other events, this one can happen even on nodes which don't
|
|
||||||
// have any open custom protocol slot. Therefore it is not necessarily in the
|
|
||||||
// list.
|
|
||||||
if let Some(n) = self.nodes_info.get_mut(&peer_id) {
|
|
||||||
n.client_version = Some(info.agent_version);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => {
|
|
||||||
// Contrary to the other events, this one can happen even on nodes which don't
|
|
||||||
// have any open custom protocol slot. Therefore it is not necessarily in the
|
|
||||||
// list.
|
|
||||||
if let Some(n) = self.nodes_info.get_mut(&peer_id) {
|
|
||||||
n.latest_ping = Some(ping_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break Ok(Async::NotReady),
|
Ok(Async::NotReady) => break Ok(Async::NotReady),
|
||||||
Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"),
|
Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"),
|
||||||
Err(_) => unreachable!("The Swarm never errors"),
|
Err(_) => unreachable!("The Swarm never errors"),
|
||||||
|
|||||||
Reference in New Issue
Block a user