diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index da1282b88c..ba79e0ef32 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -98,6 +98,11 @@ impl Behaviour { self.custom_protocols.cleanup(); } + /// Returns the list of reserved nodes. + pub fn reserved_peers(&self) -> impl Iterator { + self.custom_protocols.reserved_peers() + } + /// Try to add a reserved peer. pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) { self.custom_protocols.add_reserved_peer(peer_id, addr) @@ -111,6 +116,11 @@ impl Behaviour { self.custom_protocols.remove_reserved_peer(peer_id) } + /// Returns true if we only accept reserved nodes. + pub fn is_reserved_only(&self) -> bool { + self.custom_protocols.is_reserved_only() + } + /// Start accepting all peers again if we weren't. pub fn accept_unreserved_peers(&mut self) { self.custom_protocols.accept_unreserved_peers() @@ -129,6 +139,21 @@ impl Behaviour { self.custom_protocols.ban_peer(peer_id) } + /// Returns a list of all the peers that are banned, and until when. + pub fn banned_nodes(&self) -> impl Iterator { + self.custom_protocols.banned_peers() + } + + /// Returns true if we try to open protocols with the given peer. + pub fn is_enabled(&self, peer_id: &PeerId) -> bool { + self.custom_protocols.is_enabled(peer_id) + } + + /// Returns the list of protocols we have open with the given peer. + pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator + 'a { + self.custom_protocols.open_protocols(peer_id) + } + /// Disconnects the custom protocols from a peer. /// /// The peer will still be able to use Kademlia or other protocols, but will get disconnected @@ -142,6 +167,16 @@ impl Behaviour { pub fn drop_node(&mut self, peer_id: &PeerId) { self.custom_protocols.disconnect_peer(peer_id) } + + /// Returns the list of peers in the topology. + pub fn known_peers(&self) -> impl Iterator { + self.custom_protocols.known_peers() + } + + /// Returns a list of addresses known for this peer, and their reputation score. + pub fn known_addresses(&mut self, peer_id: &PeerId) -> impl Iterator { + self.custom_protocols.known_addresses(peer_id) + } } /// Event that can be emitted by the behaviour. @@ -196,6 +231,14 @@ pub enum BehaviourOut { /// Information about the peer. info: IdentifyInfo, }, + + /// We have successfully pinged a peer. + PingSuccess { + /// Id of the peer that has been pinged. + peer_id: PeerId, + /// Time it took for the ping to come back. + ping_time: Duration, + }, } impl From> for BehaviourOut { @@ -290,6 +333,7 @@ impl NetworkBehaviourEventProcess for Behaviour match event { PingEvent::PingSuccess { peer, time } => { trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, time); + self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: time }); } } } diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index d946fe700e..70787d89e1 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -177,6 +177,11 @@ impl CustomProtos { } } + /// Returns the list of reserved nodes. + pub fn reserved_peers(&self) -> impl Iterator { + self.reserved_peers.iter() + } + /// Adds a reserved peer. pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) { self.topology.add_bootstrap_addr(&peer_id, addr); @@ -194,6 +199,11 @@ impl CustomProtos { self.reserved_peers.remove(&peer_id); } + /// Returns true if we only accept reserved nodes. + pub fn is_reserved_only(&self) -> bool { + self.reserved_only + } + /// Start accepting all peers again if we weren't. pub fn accept_unreserved_peers(&mut self) { if !self.reserved_only { @@ -258,6 +268,24 @@ impl CustomProtos { } } + /// Returns a list of all the peers that are banned, and until when. + pub fn banned_peers(&self) -> impl Iterator { + self.banned_peers.iter().map(|&(ref id, until)| (id, until)) + } + + /// Returns true if we try to open protocols with the given peer. + pub fn is_enabled(&self, peer_id: &PeerId) -> bool { + self.enabled_peers.contains_key(peer_id) + } + + /// Returns the list of protocols we have open with the given peer. + pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator + 'a { + self.open_protocols + .iter() + .filter(move |(p, _)| p == peer_id) + .map(|(_, proto)| *proto) + } + /// Sends a message to a peer using the given custom protocol. /// /// Has no effect if the custom protocol is not open with the given peer. @@ -303,6 +331,16 @@ impl CustomProtos { self.topology.cleanup(); } + /// Returns the list of peers in the topology. + pub fn known_peers(&self) -> impl Iterator { + self.topology.known_peers() + } + + /// Returns a list of addresses known for this peer, and their reputation score. + pub fn known_addresses(&mut self, peer_id: &PeerId) -> impl Iterator { + self.topology.addresses_of_peer(peer_id, true) + } + /// Updates the attempted connections to nodes. /// /// Also updates `next_connect_to_nodes` with the earliest known moment when we need to @@ -381,7 +419,7 @@ where } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.topology.addresses_of_peer(peer_id) + self.topology.addresses_of_peer(peer_id, false).map(|(a, _)| a.clone()).collect() } fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { diff --git a/substrate/core/network-libp2p/src/custom_proto/topology.rs b/substrate/core/network-libp2p/src/custom_proto/topology.rs index 4ad0e8d145..14792e2f57 100644 --- a/substrate/core/network-libp2p/src/custom_proto/topology.rs +++ b/substrate/core/network-libp2p/src/custom_proto/topology.rs @@ -307,21 +307,25 @@ impl NetTopology { anything_changed } - /// Returns the addresses stored for a specific peer. + /// Returns the list of peers that are stored in the topology. #[inline] - pub fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - let peer = if let Some(peer) = self.store.get_mut(peer) { - peer - } else { - return Vec::new() - }; + pub fn known_peers(&self) -> impl Iterator { + self.store.keys() + } + /// Returns the addresses stored for a specific peer, and their reputation score. + /// + /// If `include_expired` is true, includes expired addresses that shouldn't be taken into + /// account when dialing. + #[inline] + pub fn addresses_of_peer(&mut self, peer: &PeerId, include_expired: bool) + -> impl Iterator { let now_st = SystemTime::now(); let now_is = Instant::now(); - let mut list = peer.addrs.iter_mut().filter_map(move |addr| { + let mut list = self.store.get_mut(peer).into_iter().flat_map(|p| p.addrs.iter_mut()).filter_map(move |addr| { let (score, connected) = addr.score_and_is_connected(); - if (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected { + if include_expired || (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected { Some((score, &addr.addr)) } else { None @@ -329,7 +333,7 @@ impl NetTopology { }).collect::>(); list.sort_by(|a, b| a.0.cmp(&b.0)); // TODO: meh, optimize - list.into_iter().map(|(_, addr)| addr.clone()).collect::>() + list.into_iter().map(|(score, addr)| (addr, score)) } /// Marks the given peer as connected through the given endpoint. diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index a6f66c6026..8986afe100 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -32,13 +32,9 @@ pub use crate::traits::{NetworkConfiguration, NodeIndex, NodeId, NonReservedPeer pub use crate::traits::{ProtocolId, Secret, Severity}; pub use libp2p::{Multiaddr, multiaddr::Protocol, build_multiaddr, PeerId, core::PublicKey}; -/// Check if node url is valid -pub fn validate_node_url(url: &str) -> Result<(), Error> { - match url.parse::() { - Ok(_) => Ok(()), - Err(_) => Err(ErrorKind::InvalidNodeId.into()), - } -} +use libp2p::core::nodes::ConnectedPoint; +use serde_derive::Serialize; +use std::{collections::{HashMap, HashSet}, time::Duration}; /// Parses a string address and returns the component, if valid. pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> { @@ -50,3 +46,85 @@ pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> { }; Ok((who, addr)) } + +/// Returns general information about the networking. +/// +/// Meant for general diagnostic purposes. +/// +/// **Warning**: This API is not stable. +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkState { + /// PeerId of the local node. + pub peer_id: String, + /// List of addresses the node is currently listening on. + pub listened_addresses: HashSet, + // TODO (https://github.com/libp2p/rust-libp2p/issues/978): external_addresses: Vec, + /// If true, we only accept reserved peers. + pub is_reserved_only: bool, + /// PeerIds of the nodes that are marked as reserved. + pub reserved_peers: HashSet, + /// PeerIds of the nodes that are banned, and how long in the seconds the ban remains. + pub banned_peers: HashMap, + /// List of node we're connected to. + pub connected_peers: HashMap, + /// List of node that we know of but that we're not connected to. + pub not_connected_peers: HashMap, + /// Downloaded bytes per second averaged over the past few seconds. + pub average_download_per_sec: u64, + /// Uploaded bytes per second averaged over the past few seconds. + pub average_upload_per_sec: u64, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkStatePeer { + /// How we are connected to the node. + pub endpoint: NetworkStatePeerEndpoint, + /// Node information, as provided by the node itself. Can be empty if not known yet. + pub version_string: Option, + /// Latest ping duration with this node. + pub latest_ping_time: Option, + /// If true, the peer is "enabled", which means that we try to open Substrate-related protocols + /// with this peer. If false, we stick to Kademlia and/or other network-only protocols. + pub enabled: bool, + /// List of protocols that we have open with the given peer. + pub open_protocols: HashSet, + /// List of addresses known for this node, with its reputation score. + pub known_addresses: HashMap, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkStateNotConnectedPeer { + /// List of addresses known for this node, with its reputation score. + pub known_addresses: HashMap, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum NetworkStatePeerEndpoint { + /// We are dialing the given address. + Dialing(Multiaddr), + /// We are listening. + Listening { + /// Address we're listening on that received the connection. + listen_addr: Multiaddr, + /// Address data is sent back to. + send_back_addr: Multiaddr, + }, +} + +impl From for NetworkStatePeerEndpoint { + fn from(endpoint: ConnectedPoint) -> Self { + match endpoint { + ConnectedPoint::Dialer { ref address } => + NetworkStatePeerEndpoint::Dialing(address.clone()), + ConnectedPoint::Listener { ref listen_addr, ref send_back_addr } => + NetworkStatePeerEndpoint::Listening { + listen_addr: listen_addr.clone(), + send_back_addr: send_back_addr.clone() + } + } + } +} diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 77948d29a3..8c385878dc 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -16,7 +16,7 @@ use crate::{ behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key_from_config, - transport + transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer }; use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols}; use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr}; @@ -32,7 +32,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio_timer::Interval; /// Starts the substrate libp2p service. @@ -223,10 +223,63 @@ struct NodeInfo { endpoint: ConnectedPoint, /// Version reported by the remote, or `None` if unknown. client_version: Option, + /// Latest ping time with this node. + latest_ping: Option, } impl Service where TMessage: CustomMessage + Send + 'static { + /// Returns a struct containing tons of useful information about the network. + pub fn state(&mut self) -> NetworkState { + let now = Instant::now(); + + let connected_peers = { + let swarm = &mut self.swarm; + self.nodes_info.values().map(move |info| { + let known_addresses = swarm.known_addresses(&info.peer_id) + .map(|(a, s)| (a.clone(), s)).collect(); + + (info.peer_id.to_base58(), NetworkStatePeer { + endpoint: info.endpoint.clone().into(), + version_string: info.client_version.clone(), + latest_ping_time: info.latest_ping, + enabled: swarm.is_enabled(&info.peer_id), + open_protocols: swarm.open_protocols(&info.peer_id).collect(), + known_addresses, + }) + }).collect() + }; + + let not_connected_peers = { + let swarm = &mut self.swarm; + let index_by_id = &self.index_by_id; + let list = swarm.known_peers().filter(|p| !index_by_id.contains_key(p)) + .cloned().collect::>(); + list.into_iter().map(move |peer_id| { + let known_addresses = swarm.known_addresses(&peer_id) + .map(|(a, s)| (a.clone(), s)).collect(); + (peer_id.to_base58(), NetworkStateNotConnectedPeer { + known_addresses, + }) + }).collect() + }; + + NetworkState { + peer_id: Swarm::local_peer_id(&self.swarm).to_base58(), + listened_addresses: Swarm::listeners(&self.swarm).cloned().collect(), + reserved_peers: self.swarm.reserved_peers().map(|p| p.to_base58()).collect(), + banned_peers: self.swarm.banned_nodes().map(|(p, until)| { + let dur = if until > now { until - now } else { Duration::new(0, 0) }; + (p.to_base58(), dur.as_secs()) + }).collect(), + is_reserved_only: self.swarm.is_reserved_only(), + average_download_per_sec: self.bandwidth.average_download_per_sec(), + average_upload_per_sec: self.bandwidth.average_upload_per_sec(), + connected_peers, + not_connected_peers, + } + } + /// Returns an iterator that produces the list of addresses we're listening on. #[inline] pub fn listeners(&self) -> impl Iterator { @@ -360,6 +413,7 @@ where TMessage: CustomMessage + Send + 'static { peer_id: entry.key().clone(), endpoint, client_version: None, + latest_ping: None, }); id }, @@ -370,6 +424,7 @@ where TMessage: CustomMessage + Send + 'static { peer_id: entry.key().clone(), endpoint, client_version: None, + latest_ping: None, }); entry.insert(id); id @@ -427,6 +482,16 @@ where TMessage: CustomMessage + Send + 'static { .client_version = Some(info.agent_version); } } + Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => { + // Contrary to the other events, this one can happen even on nodes which don't + // have any open custom protocol slot. Therefore it is not necessarily in the + // list. + if let Some(id) = self.index_by_id.get(&peer_id) { + self.nodes_info.get_mut(id) + .expect("index_by_id and nodes_info are always kept in sync; QED") + .latest_ping = Some(ping_time); + } + } Ok(Async::NotReady) => break Ok(Async::NotReady), Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), Err(_) => unreachable!("The Swarm never errors"), diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 31c90cb311..8e00a66e63 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -43,6 +43,7 @@ pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ NodeIndex, ProtocolId, Severity, Protocol, Multiaddr, + NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint, obtain_private_key, build_multiaddr, PeerId, PublicKey }; pub use message::{generic as generic_message, RequestId, Status as StatusMessage, ConsensusEngineId}; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 3ccb9c1277..23eb1229ab 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -23,7 +23,7 @@ use futures::{Async, Future, Stream, stream, sync::oneshot}; use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, ErrorKind, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; -use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol}; +use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol, NetworkState}; use consensus::import_queue::{ImportQueue, Link}; use crate::consensus_gossip::ConsensusGossip; use crate::message::{Message, ConsensusEngineId}; @@ -46,6 +46,8 @@ pub type FetchFuture = oneshot::Receiver>; pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> ProtocolStatus; + /// Get network state. + fn network_state(&self) -> NetworkState; /// Get currently connected peers fn peers(&self) -> Vec<(NodeIndex, PeerInfo)>; } @@ -290,6 +292,10 @@ impl> SyncProvider for Servi 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") } + fn network_state(&self) -> NetworkState { + self.network.lock().state() + } + fn peers(&self) -> Vec<(NodeIndex, PeerInfo)> { let peers = (*self.peers.read()).clone(); peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index a7684f2379..57abae0019 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -60,6 +60,13 @@ pub trait SystemApi { /// Returns currently connected peers #[rpc(name = "system_peers")] fn system_peers(&self) -> Result>>; + + /// Returns current state of the network. + /// + /// **Warning**: This API is not stable. + // TODO: make this stable and move structs https://github.com/paritytech/substrate/issues/1890 + #[rpc(name = "system_networkState")] + fn system_network_state(&self) -> Result; } /// System API implementation @@ -120,4 +127,8 @@ impl SystemApi::Number> for Sy best_number: p.best_number, }).collect()) } + + fn system_network_state(&self) -> Result { + Ok(self.sync.network_state()) + } } diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index 18c753f533..d1ca9339dc 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -52,6 +52,20 @@ impl network::SyncProvider for Status { } } + fn network_state(&self) -> network::NetworkState { + network::NetworkState { + peer_id: String::new(), + listened_addresses: Default::default(), + is_reserved_only: false, + reserved_peers: Default::default(), + banned_peers: Default::default(), + connected_peers: Default::default(), + not_connected_peers: Default::default(), + average_download_per_sec: 0, + average_upload_per_sec: 0, + } + } + fn peers(&self) -> Vec<(NodeIndex, NetworkPeerInfo)> { vec![(1, NetworkPeerInfo { peer_id: self.peer_id.clone(), @@ -181,3 +195,21 @@ fn system_peers() { }] ); } + +#[test] +fn system_network_state() { + assert_eq!( + api(None).system_network_state().unwrap(), + network::NetworkState { + peer_id: String::new(), + listened_addresses: Default::default(), + is_reserved_only: false, + reserved_peers: Default::default(), + banned_peers: Default::default(), + connected_peers: Default::default(), + not_connected_peers: Default::default(), + average_download_per_sec: 0, + average_upload_per_sec: 0, + } + ); +}