// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use crate::DiscoveryNetBehaviour; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; use libp2p::kad::{Kademlia, KademliaOut}; #[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; use std::{cmp, iter, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; use void; mod debug_info; /// General behaviour of the network. #[derive(NetworkBehaviour)] #[behaviour(out_event = "TBehaviourEv", poll_method = "poll")] pub struct Behaviour { /// Main protocol that handles everything except the discovery and the technicalities. user_protocol: UserBehaviourWrap, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. debug_info: debug_info::DebugInfoBehaviour, /// Discovers nodes of the network. Defined below. discovery: DiscoveryBehaviour, /// Discovers nodes on the local network. #[cfg(not(target_os = "unknown"))] mdns: Toggle>, /// Queue of events to produce for the outside. #[behaviour(ignore)] events: Vec, } impl Behaviour { /// Builds a new `Behaviour`. pub fn new( user_protocol: TBehaviour, user_agent: String, local_public_key: PublicKey, known_addresses: Vec<(PeerId, Multiaddr)>, enable_mdns: bool, ) -> Self { let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()); let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); for (peer_id, addr) in &known_addresses { kademlia.add_connected_address(peer_id, addr.clone()); } if enable_mdns { #[cfg(target_os = "unknown")] warn!(target: "sub-libp2p", "mDNS is not available on this platform"); } let clock = Clock::new(); Behaviour { user_protocol: UserBehaviourWrap(user_protocol), debug_info, discovery: DiscoveryBehaviour { user_defined: known_addresses, kademlia, next_kad_random_query: Delay::new(clock.now()), duration_to_next_kad: Duration::from_secs(1), clock, local_peer_id: local_public_key.into_peer_id(), }, #[cfg(not(target_os = "unknown"))] mdns: if enable_mdns { match Mdns::new() { Ok(mdns) => Some(mdns).into(), Err(err) => { warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); None.into() } } } else { None.into() }, events: Vec::new(), } } /// Returns the list of nodes that we know exist in the network. pub fn known_peers(&self) -> impl Iterator { self.discovery.kademlia.kbuckets_entries() } /// Adds a hard-coded address for the given peer, that never expires. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { self.discovery.user_defined.push((peer_id, addr)); } } /// Borrows `self` and returns a struct giving access to the information about a node. /// /// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes /// we're connected to, meaning that if `None` is returned then we're not connected to that /// node. pub fn node(&self, peer_id: &PeerId) -> Option { self.debug_info.node(peer_id) } /// Returns a shared reference to the user protocol. pub fn user_protocol(&self) -> &TBehaviour { &self.user_protocol.0 } /// Returns a mutable reference to the user protocol. pub fn user_protocol_mut(&mut self) -> &mut TBehaviour { &mut self.user_protocol.0 } } impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: UserEventWrap) { self.events.push(event.0); } } impl NetworkBehaviourEventProcess for Behaviour where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; if !info.protocol_version.contains("substrate") { warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info); } if info.listen_addrs.len() > 30 { warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ it is identified by {:?} and {:?}", peer_id, info.protocol_version, info.agent_version ); info.listen_addrs.truncate(30); } for addr in &info.listen_addrs { self.discovery.kademlia.add_connected_address(&peer_id, addr.clone()); } self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone())); } } impl NetworkBehaviourEventProcess for Behaviour where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, out: KademliaOut) { match out { KademliaOut::Discovered { .. } => {} KademliaOut::KBucketAdded { peer_id, .. } => { self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); } KademliaOut::FindNodeResult { key, closer_peers } => { trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results", key, closer_peers.len()); if closer_peers.is_empty() { warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \ results"); } } // We never start any GET_PROVIDERS query. KademliaOut::GetProvidersResult { .. } => () } } } #[cfg(not(target_os = "unknown"))] impl NetworkBehaviourEventProcess for Behaviour where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); }, MdnsEvent::Expired(_) => {} } } } impl Behaviour { fn poll(&mut self) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } Async::NotReady } } /// Because of limitations with the network behaviour custom derive and trait impl duplication, we /// have to wrap the user protocol into a struct. pub struct UserBehaviourWrap(TInner); /// Event produced by `UserBehaviourWrap`. pub struct UserEventWrap(TInner); impl NetworkBehaviour for UserBehaviourWrap { type ProtocolsHandler = TInner::ProtocolsHandler; type OutEvent = UserEventWrap; fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { self.0.addresses_of_peer(peer_id) } fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { self.0.inject_connected(peer_id, endpoint) } fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { self.0.inject_disconnected(peer_id, endpoint) } fn inject_node_event( &mut self, peer_id: PeerId, event: <::Handler as ProtocolsHandler>::OutEvent ) { self.0.inject_node_event(peer_id, event) } fn poll( &mut self, params: &mut PollParameters ) -> Async< NetworkBehaviourAction< <::Handler as ProtocolsHandler>::InEvent, Self::OutEvent > > { match self.0.poll(params) { Async::NotReady => Async::NotReady, Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))), Async::Ready(NetworkBehaviourAction::DialAddress { address }) => Async::Ready(NetworkBehaviourAction::DialAddress { address }), Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), } } fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint) } fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { self.0.inject_addr_reach_failure(peer_id, addr, error) } fn inject_dial_failure(&mut self, peer_id: &PeerId) { self.0.inject_dial_failure(peer_id) } fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { self.0.inject_new_listen_addr(addr) } fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { self.0.inject_expired_listen_addr(addr) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { self.0.inject_new_external_addr(addr) } } /// Implementation of `NetworkBehaviour` that discovers the nodes on the network. pub struct DiscoveryBehaviour { /// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and /// reserved nodes. user_defined: Vec<(PeerId, Multiaddr)>, /// Kademlia requests and answers. kademlia: Kademlia, /// Stream that fires when we need to perform the next random Kademlia query. next_kad_random_query: Delay, /// After `next_kad_random_query` triggers, the next one triggers after this duration. duration_to_next_kad: Duration, /// `Clock` instance that uses the current execution context's source of time. clock: Clock, /// Identity of our local node. local_peer_id: PeerId, } impl NetworkBehaviour for DiscoveryBehaviour where TSubstream: AsyncRead + AsyncWrite, { type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; type OutEvent = as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { NetworkBehaviour::new_handler(&mut self.kademlia) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { let mut list = self.user_defined.iter() .filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None }) .collect::>(); list.extend(self.kademlia.addresses_of_peer(peer_id)); trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list); if list.is_empty() { if self.kademlia.kbuckets_entries().any(|p| p == peer_id) { debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \ and no address was found", peer_id); } else { debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \ and no address was found", peer_id); } } list } fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint) } fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint) } fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened) } fn inject_node_event( &mut self, peer_id: PeerId, event: ::OutEvent, ) { NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { let new_addr = addr.clone() .with(Protocol::P2p(self.local_peer_id.clone().into())); info!(target: "sub-libp2p", "Discovered new external address for our node: {}", new_addr); } fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { info!(target: "sub-libp2p", "No longer listening on {}", addr); } fn poll( &mut self, params: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, Self::OutEvent, >, > { // Poll Kademlia. match self.kademlia.poll(params) { Async::Ready(action) => return Async::Ready(action), Async::NotReady => (), } // Poll the stream that fires when we need to start a random Kademlia query. loop { match self.next_kad_random_query.poll() { Ok(Async::NotReady) => break, Ok(Async::Ready(_)) => { let random_peer_id = PeerId::random(); debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \ {:?}", random_peer_id); self.kademlia.find_node(random_peer_id); // Reset the `Delay` to the next random. self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad); self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); }, Err(err) => { warn!(target: "sub-libp2p", "Kademlia query timer errored: {:?}", err); break } } } Async::NotReady } }