diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index 5cbad3208d..b0eb1a66eb 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -22,37 +22,36 @@ use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActi use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; -use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo}; use libp2p::kad::{Kademlia, KademliaOut}; #[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; -use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; use log::{debug, info, trace, warn}; use std::{cmp, iter, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; use void; +mod debug_info; + /// General behaviour of the network. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] +#[behaviour(out_event = "TBehaviourEv", poll_method = "poll")] pub struct Behaviour { /// Main protocol that handles everything except the discovery and the technicalities. user_protocol: UserBehaviourWrap, - /// Periodically ping nodes, and close the connection if it's unresponsive. - ping: Ping, + /// Periodically pings and identifies the nodes we are connected to, and store information in a + /// cache. + debug_info: debug_info::DebugInfoBehaviour, /// Discovers nodes of the network. Defined below. discovery: DiscoveryBehaviour, - /// Periodically identifies the remote and responds to incoming requests. - identify: Identify, /// Discovers nodes on the local network. #[cfg(not(target_os = "unknown"))] mdns: Toggle>, /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec>, + events: Vec, } impl Behaviour { @@ -64,10 +63,7 @@ impl Behaviour, enable_mdns: bool, ) -> Self { - let identify = { - let proto_version = "/substrate/1.0".to_string(); - Identify::new(proto_version, user_agent, local_public_key.clone()) - }; + let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()); let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); for (peer_id, addr) in &known_addresses { @@ -82,7 +78,7 @@ impl Behaviour Behaviour Behaviour Option { + self.debug_info.node(peer_id) + } + /// Returns a shared reference to the user protocol. pub fn user_protocol(&self) -> &TBehaviour { &self.user_protocol.0 @@ -131,73 +135,39 @@ impl Behaviour { - /// Message from the user protocol. - UserProtocol(TBehaviourEv), - - /// We have obtained debug information from a peer. - Identified { - /// Id of the peer that has been identified. - peer_id: PeerId, - /// Information about the peer. - info: IdentifyInfo, - }, - - /// We have successfully pinged a peer. - PingSuccess { - /// Id of the peer that has been pinged. - peer_id: PeerId, - /// Time it took for the ping to come back. - ping_time: Duration, - }, -} - -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for +Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for Behaviour { +impl NetworkBehaviourEventProcess> for +Behaviour { fn inject_event(&mut self, event: UserEventWrap) { - self.events.push(BehaviourOut::UserProtocol(event.0)); + self.events.push(event.0); } } -impl NetworkBehaviourEventProcess +impl NetworkBehaviourEventProcess for Behaviour where TBehaviour: DiscoveryNetBehaviour { - fn inject_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Identified { peer_id, mut info, .. } => { - trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info); - // TODO: ideally we would delay the first identification to when we open the custom - // protocol, so that we only report id info to the service about the nodes we - // care about (https://github.com/libp2p/rust-libp2p/issues/876) - if !info.protocol_version.contains("substrate") { - warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info); - } - if info.listen_addrs.len() > 30 { - warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ - it is identified by {:?} and {:?}", peer_id, info.protocol_version, - info.agent_version - ); - info.listen_addrs.truncate(30); - } - for addr in &info.listen_addrs { - self.discovery.kademlia.add_connected_address(&peer_id, addr.clone()); - } - self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone())); - self.events.push(BehaviourOut::Identified { peer_id, info }); - } - IdentifyEvent::Error { .. } => {} - IdentifyEvent::SendBack { result: Err(ref err), ref peer_id } => - debug!(target: "sub-libp2p", "Error when sending back identify info \ - to {:?} => {}", peer_id, err), - IdentifyEvent::SendBack { .. } => {} + fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { + let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; + if !info.protocol_version.contains("substrate") { + warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info); } + if info.listen_addrs.len() > 30 { + warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ + it is identified by {:?} and {:?}", peer_id, info.protocol_version, + info.agent_version + ); + info.listen_addrs.truncate(30); + } + for addr in &info.listen_addrs { + self.discovery.kademlia.add_connected_address(&peer_id, addr.clone()); + } + self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone())); } } @@ -224,18 +194,6 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, event: PingEvent) { - match event { - PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { - trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, rtt); - self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: rtt }); - } - _ => () - } - } -} - #[cfg(not(target_os = "unknown"))] impl NetworkBehaviourEventProcess for Behaviour @@ -251,7 +209,7 @@ impl NetworkBehaviourEventProcess Behaviour { - fn poll(&mut self) -> Async>> { + fn poll(&mut self) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } @@ -279,16 +237,21 @@ impl NetworkBehaviour for UserBehaviourWrap { self.0.inject_disconnected(peer_id, endpoint) } fn inject_node_event( - &mut self, - peer_id: PeerId, - event: <::Handler as ProtocolsHandler>::OutEvent - ) { + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent + ) { self.0.inject_node_event(peer_id, event) } fn poll( &mut self, params: &mut PollParameters - ) -> Async::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> { + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { match self.0.poll(params) { Async::NotReady => Async::NotReady, Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => diff --git a/substrate/core/network-libp2p/src/behaviour/debug_info.rs b/substrate/core/network-libp2p/src/behaviour/debug_info.rs new file mode 100644 index 0000000000..46c7422fd7 --- /dev/null +++ b/substrate/core/network-libp2p/src/behaviour/debug_info.rs @@ -0,0 +1,329 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use fnv::FnvHashMap; +use futures::prelude::*; +use libp2p::Multiaddr; +use libp2p::core::{either::EitherOutput, PeerId, PublicKey}; +use libp2p::core::protocols_handler::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler}; +use libp2p::core::nodes::ConnectedPoint; +use libp2p::core::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo}; +use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; +use log::{debug, trace, error}; +use std::collections::hash_map::Entry; +use std::time::{Duration, Instant}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Interval; + +/// Time after we disconnect from a node before we purge its information from the cache. +const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60); +/// Interval at which we perform garbage collection on the node info. +const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60); + +/// Implementation of `NetworkBehaviour` that holds information about nodes in cache for diagnostic +/// purposes. +pub struct DebugInfoBehaviour { + /// Periodically ping nodes, and close the connection if it's unresponsive. + ping: Ping, + /// Periodically identifies the remote and responds to incoming requests. + identify: Identify, + /// Information that we know about all nodes. + nodes_info: FnvHashMap, + /// Interval at which we perform garbage collection in `nodes_info`. + garbage_collect: Interval, +} + +/// Information about a node we're connected to. +#[derive(Debug)] +struct NodeInfo { + /// When we will remove the entry about this node from the list, or `None` if we're connected + /// to the node. + info_expire: Option, + /// How we're connected to the node. + endpoint: ConnectedPoint, + /// Version reported by the remote, or `None` if unknown. + client_version: Option, + /// Latest ping time with this node. + latest_ping: Option, +} + +impl DebugInfoBehaviour { + /// Builds a new `DebugInfoBehaviour`. + pub fn new( + user_agent: String, + local_public_key: PublicKey, + ) -> Self { + let identify = { + let proto_version = "/substrate/1.0".to_string(); + Identify::new(proto_version, user_agent, local_public_key.clone()) + }; + + DebugInfoBehaviour { + ping: Ping::new(PingConfig::new()), + identify, + nodes_info: FnvHashMap::default(), + garbage_collect: Interval::new_interval(GARBAGE_COLLECT_INTERVAL), + } + } + + /// Borrows `self` and returns a struct giving access to the information about a node. + /// + /// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes + /// we're connected to, meaning that if `None` is returned then we're not connected to that + /// node. + pub fn node(&self, peer_id: &PeerId) -> Option { + self.nodes_info.get(peer_id).map(Node) + } + + /// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node, + /// which shouldn't happen. + fn handle_ping_report(&mut self, peer_id: &PeerId, ping_time: Duration) { + trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer_id, ping_time); + if let Some(entry) = self.nodes_info.get_mut(peer_id) { + entry.latest_ping = Some(ping_time); + } else { + error!(target: "sub-libp2p", + "Received ping from node we're not connected to {:?}", peer_id); + } + } + + /// Inserts an identify record in the cache. Has no effect if we don't have any entry for that + /// node, which shouldn't happen. + fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { + trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info); + if let Some(entry) = self.nodes_info.get_mut(peer_id) { + entry.client_version = Some(info.agent_version.clone()); + } else { + error!(target: "sub-libp2p", + "Received pong from node we're not connected to {:?}", peer_id); + } + } +} + +/// Gives access to the information about a node. +pub struct Node<'a>(&'a NodeInfo); + +impl<'a> Node<'a> { + /// Returns the endpoint we are connected to or were last connected to. + pub fn endpoint(&self) -> &'a ConnectedPoint { + &self.0.endpoint + } + + /// Returns the latest version information we know of. + pub fn client_version(&self) -> Option<&'a str> { + self.0.client_version.as_ref().map(|s| &s[..]) + } + + /// Returns the latest ping time we know of for this node. `None` if we never successfully + /// pinged this node. + pub fn latest_ping(&self) -> Option { + self.0.latest_ping + } + + /// Generates an arbitrary string containing debug information about the node. + pub fn debug_info(&self) -> String { + format!("(version: {:?}) through {:?}", self.0.client_version, self.0.endpoint) + } +} + +/// Event that can be emitted by the behaviour. +#[derive(Debug)] +pub enum DebugInfoEvent { + /// We have obtained debug information from a peer, including the addresses it is listening + /// on. + Identified { + /// Id of the peer that has been identified. + peer_id: PeerId, + /// Information about the peer. + info: IdentifyInfo, + }, +} + +impl NetworkBehaviour for DebugInfoBehaviour +where TSubstream: AsyncRead + AsyncWrite { + type ProtocolsHandler = IntoProtocolsHandlerSelect< + as NetworkBehaviour>::ProtocolsHandler, + as NetworkBehaviour>::ProtocolsHandler + >; + type OutEvent = DebugInfoEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + IntoProtocolsHandler::select(self.ping.new_handler(), self.identify.new_handler()) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut list = self.ping.addresses_of_peer(peer_id); + list.extend_from_slice(&self.identify.addresses_of_peer(peer_id)); + list + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.ping.inject_connected(peer_id.clone(), endpoint.clone()); + self.identify.inject_connected(peer_id.clone(), endpoint.clone()); + + match self.nodes_info.entry(peer_id) { + Entry::Vacant(e) => { + e.insert(NodeInfo { + info_expire: None, + endpoint, + client_version: None, + latest_ping: None, + }); + } + Entry::Occupied(e) => { + let e = e.into_mut(); + if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false) { + e.client_version = None; + e.latest_ping = None; + } + e.info_expire = None; + e.endpoint = endpoint; + } + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.ping.inject_disconnected(peer_id, endpoint.clone()); + self.identify.inject_disconnected(peer_id, endpoint); + + if let Some(entry) = self.nodes_info.get_mut(peer_id) { + entry.info_expire = Some(Instant::now() + CACHE_EXPIRE); + } else { + error!(target: "sub-libp2p", + "Disconnected from node we were not connected to {:?}", peer_id); + } + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent + ) { + match event { + EitherOutput::First(event) => self.ping.inject_node_event(peer_id, event), + EitherOutput::Second(event) => self.identify.inject_node_event(peer_id, event), + } + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.ping.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); + self.identify.inject_replaced(peer_id.clone(), closed_endpoint, new_endpoint.clone()); + + if let Some(entry) = self.nodes_info.get_mut(&peer_id) { + entry.endpoint = new_endpoint; + } else { + error!(target: "sub-libp2p", + "Disconnected from node we were not connected to {:?}", peer_id); + } + } + + fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { + self.ping.inject_addr_reach_failure(peer_id, addr, error); + self.identify.inject_addr_reach_failure(peer_id, addr, error); + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.ping.inject_dial_failure(peer_id); + self.identify.inject_dial_failure(peer_id); + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.ping.inject_new_listen_addr(addr); + self.identify.inject_new_listen_addr(addr); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.ping.inject_expired_listen_addr(addr); + self.identify.inject_expired_listen_addr(addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.ping.inject_new_external_addr(addr); + self.identify.inject_new_external_addr(addr); + } + + fn poll( + &mut self, + params: &mut PollParameters + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { + loop { + match self.ping.poll(params) { + Async::NotReady => break, + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => { + if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev { + self.handle_ping_report(&peer, rtt) + } + }, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { + peer_id, + event: EitherOutput::First(event) + }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + } + } + + loop { + match self.identify.poll(params) { + Async::NotReady => break, + Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match event { + IdentifyEvent::Identified { peer_id, info, .. } => { + self.handle_identify_report(&peer_id, &info); + let event = DebugInfoEvent::Identified { peer_id, info }; + return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { result: Err(ref err), ref peer_id } => + debug!(target: "sub-libp2p", "Error when sending back identify info \ + to {:?} => {}", peer_id, err), + IdentifyEvent::SendBack { .. } => {} + } + }, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { + peer_id, + event: EitherOutput::Second(event) + }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + } + } + + while let Ok(Async::Ready(Some(_))) = self.garbage_collect.poll() { + self.nodes_info.retain(|_, node| { + node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true) + }); + } + + Async::NotReady + } +} diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index a4a4f4aff5..0aa6ed9544 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -154,6 +154,22 @@ enum PeerState { }, } +impl PeerState { + /// True if we have an open channel with that node. + fn is_open(&self) -> bool { + match self { + PeerState::Poisoned => false, + PeerState::Banned { .. } => false, + PeerState::PendingRequest { .. } => false, + PeerState::Requested => false, + PeerState::Disabled { open, .. } => *open, + PeerState::DisabledPendingEnable { open, .. } => *open, + PeerState::Enabled { open, .. } => *open, + PeerState::Incoming { .. } => false, + } + } +} + /// State of an "incoming" message sent to the peer set manager. #[derive(Debug)] struct IncomingPeer { @@ -223,6 +239,16 @@ impl CustomProto { } } + /// Returns the list of all the peers we have an open channel to. + pub fn open_peers<'a>(&'a self) -> impl Iterator + 'a { + self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id) + } + + /// Returns true if we have a channel open with this node. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.peers.get(peer_id).map(|p| p.is_open()).unwrap_or(false) + } + /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer_id: &PeerId) { debug!(target: "sub-libp2p", "External API => Disconnect {:?}", peer_id); @@ -313,21 +339,6 @@ impl CustomProto { } } - /// Returns true if we have opened a protocol with the given peer. - pub fn is_open(&self, peer_id: &PeerId) -> bool { - match self.peers.get(peer_id) { - None => false, - Some(PeerState::Disabled { open, .. }) => *open, - Some(PeerState::DisabledPendingEnable { open, .. }) => *open, - Some(PeerState::Enabled { open, .. }) => *open, - Some(PeerState::Incoming { .. }) => false, - Some(PeerState::Requested) => false, - Some(PeerState::PendingRequest { .. }) => false, - Some(PeerState::Banned { .. }) => false, - Some(PeerState::Poisoned) => false, - } - } - /// Sends a message to a peer. /// /// Has no effect if the custom protocol is not open with the given peer. diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 6eb0748358..540d8d7f0b 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -247,6 +247,10 @@ pub struct NetworkStatePeer { pub struct NetworkStateNotConnectedPeer { /// List of addresses known for this node. pub known_addresses: HashSet, + /// Node information, as provided by the node itself, if we were ever connected to this node. + pub version_string: Option, + /// Latest ping duration with this node, if we were ever connected to this node. + pub latest_ping_time: Option, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -270,8 +274,8 @@ impl From for NetworkStatePeerEndpoint { NetworkStatePeerEndpoint::Dialing(address), ConnectedPoint::Listener { listen_addr, send_back_addr } => NetworkStatePeerEndpoint::Listening { - listen_addr: listen_addr, - send_back_addr: send_back_addr + listen_addr, + send_back_addr } } } diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 18ccb5f139..5a6fb6978c 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -15,22 +15,20 @@ // along with Substrate. If not, see . use crate::{ - behaviour::Behaviour, behaviour::BehaviourOut, + behaviour::Behaviour, transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer }; use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol}; use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr}; -use fnv::FnvHashMap; use futures::{prelude::*, Stream}; use libp2p::{Multiaddr, core::swarm::NetworkBehaviour, PeerId}; use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::core::nodes::ConnectedPoint; -use log::{debug, info, warn}; +use log::{info, error, warn}; use std::fs; use std::io::Error as IoError; use std::path::Path; use std::sync::Arc; -use std::time::Duration; /// Starts the substrate libp2p service. /// @@ -110,7 +108,6 @@ where TMessage: CustomMessage + Send + 'static { let service = Service { swarm, bandwidth, - nodes_info: Default::default(), injected_events: Vec::new(), }; @@ -158,57 +155,57 @@ pub enum ServiceEvent { /// Network service. Must be polled regularly in order for the networking to work. pub struct Service where TMessage: CustomMessage { /// Stream of events of the swarm. - swarm: Swarm, Behaviour>, CustomProtoOut, Substream>>, + swarm: Swarm< + Boxed<(PeerId, StreamMuxerBox), IoError>, + Behaviour>, CustomProtoOut, Substream> + >, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc, - /// Information about all the nodes we're connected to. - nodes_info: FnvHashMap, - /// Events to produce on the Stream. injected_events: Vec>, } -/// Information about a node we're connected to. -#[derive(Debug)] -struct NodeInfo { - /// How we're connected to the node. - endpoint: ConnectedPoint, - /// Version reported by the remote, or `None` if unknown. - client_version: Option, - /// Latest ping time with this node. - latest_ping: Option, -} - impl Service where TMessage: CustomMessage + Send + 'static { /// Returns a struct containing tons of useful information about the network. pub fn state(&mut self) -> NetworkState { + let open = self.swarm.user_protocol().open_peers().cloned().collect::>(); + let connected_peers = { let swarm = &mut self.swarm; - self.nodes_info.iter().map(move |(peer_id, info)| { + open.iter().filter_map(move |peer_id| { let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id) .into_iter().collect(); - (peer_id.to_base58(), NetworkStatePeer { - endpoint: info.endpoint.clone().into(), - version_string: info.client_version.clone(), - latest_ping_time: info.latest_ping, + let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) { + e.clone().into() + } else { + error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \ + and debug information about {:?}", peer_id); + return None + }; + + Some((peer_id.to_base58(), NetworkStatePeer { + endpoint, + version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()), enabled: swarm.user_protocol().is_enabled(&peer_id), open: swarm.user_protocol().is_open(&peer_id), known_addresses, - }) + })) }).collect() }; let not_connected_peers = { let swarm = &mut self.swarm; - let nodes_info = &self.nodes_info; - let list = swarm.known_peers().filter(|p| !nodes_info.contains_key(p)) + let list = swarm.known_peers().filter(|p| !open.iter().all(|n| n != *p)) .cloned().collect::>(); list.into_iter().map(move |peer_id| { (peer_id.to_base58(), NetworkStateNotConnectedPeer { + version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()), known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id) .into_iter().collect(), }) @@ -246,27 +243,28 @@ where TMessage: CustomMessage + Send + 'static { } /// Returns the peer id of the local node. - #[inline] pub fn peer_id(&self) -> &PeerId { Swarm::local_peer_id(&self.swarm) } /// Returns the list of all the peers we are connected to. - #[inline] pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { - self.nodes_info.keys() + self.swarm.user_protocol().open_peers() } - /// Returns the way we are connected to a node. - #[inline] + /// Returns the way we are connected to a node. Returns `None` if we are not connected to it. pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> { - self.nodes_info.get(peer_id).map(|info| &info.endpoint) + if self.swarm.user_protocol().is_open(peer_id) { + self.swarm.node(peer_id).map(|n| n.endpoint()) + } else { + None + } } - /// Returns the client version reported by a node. + /// Returns the latest client version reported by a node. Can return `Some` even for nodes + /// we're not connected to. pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> { - self.nodes_info.get(peer_id) - .and_then(|info| info.client_version.as_ref().map(|s| &s[..])) + self.swarm.node(peer_id).and_then(|n| n.client_version()) } /// Sends a message to a peer using the custom protocol. @@ -286,11 +284,7 @@ where TMessage: CustomMessage + Send + 'static { /// This is asynchronous and will not immediately close the peer. /// Corresponding closing events will be generated once the closing actually happens. pub fn drop_node(&mut self, peer_id: &PeerId) { - if let Some(info) = self.nodes_info.get(peer_id) { - debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})", - peer_id, info.endpoint, info.client_version); - self.swarm.user_protocol_mut().disconnect_peer(peer_id); - } + self.swarm.user_protocol_mut().disconnect_peer(peer_id); } /// Adds a hard-coded address for the given peer, that never expires. @@ -300,10 +294,10 @@ where TMessage: CustomMessage + Send + 'static { /// Get debug info for a given peer. pub fn peer_debug_info(&self, who: &PeerId) -> String { - if let Some(info) = self.nodes_info.get(who) { - format!("{:?} (version: {:?}) through {:?}", who, info.client_version, info.endpoint) + if let Some(node) = self.swarm.node(who) { + format!("{:?} {}", who, node.debug_info()) } else { - "unknown".to_string() + format!("{:?} (unknown)", who) } } @@ -311,12 +305,7 @@ where TMessage: CustomMessage + Send + 'static { fn poll_swarm(&mut self) -> Poll>, IoError> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolOpen { peer_id, version, endpoint })))) => { - self.nodes_info.insert(peer_id.clone(), NodeInfo { - endpoint, - client_version: None, - latest_ping: None, - }); + Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => { let debug_info = self.peer_debug_info(&peer_id); break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { peer_id, @@ -324,42 +313,25 @@ where TMessage: CustomMessage + Send + 'static { debug_info, }))) } - Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolClosed { peer_id, .. })))) => { + Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => { let debug_info = self.peer_debug_info(&peer_id); - self.nodes_info.remove(&peer_id); break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { peer_id, debug_info, }))) } - Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomMessage { peer_id, message })))) => { + Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) => { break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { peer_id, message, }))) } - Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::Clogged { peer_id, messages })))) => { + Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages }))) => { break Ok(Async::Ready(Some(ServiceEvent::Clogged { peer_id, messages, }))) } - Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => { - // Contrary to the other events, this one can happen even on nodes which don't - // have any open custom protocol slot. Therefore it is not necessarily in the - // list. - if let Some(n) = self.nodes_info.get_mut(&peer_id) { - n.client_version = Some(info.agent_version); - } - } - Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => { - // Contrary to the other events, this one can happen even on nodes which don't - // have any open custom protocol slot. Therefore it is not necessarily in the - // list. - if let Some(n) = self.nodes_info.get_mut(&peer_id) { - n.latest_ping = Some(ping_time); - } - } Ok(Async::NotReady) => break Ok(Async::NotReady), Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), Err(_) => unreachable!("The Swarm never errors"),