diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index 196988b4f6..5cbad3208d 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -14,10 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol}; +use crate::DiscoveryNetBehaviour; use futures::prelude::*; use libp2p::NetworkBehaviour; -use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; +use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; #[cfg(not(target_os = "unknown"))] @@ -29,19 +29,19 @@ use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; use log::{debug, info, trace, warn}; -use std::{borrow::Cow, cmp, time::Duration}; +use std::{cmp, iter, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; use void; /// General behaviour of the network. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] -pub struct Behaviour { +#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] +pub struct Behaviour { + /// Main protocol that handles everything except the discovery and the technicalities. + user_protocol: UserBehaviourWrap, /// Periodically ping nodes, and close the connection if it's unresponsive. ping: Ping, - /// Custom protocols (dot, bbq, sub, etc.). - custom_protocols: CustomProto, /// Discovers nodes of the network. Defined below. discovery: DiscoveryBehaviour, /// Periodically identifies the remote and responds to incoming requests. @@ -52,17 +52,16 @@ pub struct Behaviour { /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec>, + events: Vec>, } -impl Behaviour { +impl Behaviour { /// Builds a new `Behaviour`. pub fn new( + user_protocol: TBehaviour, user_agent: String, local_public_key: PublicKey, - protocol: RegisteredProtocol, known_addresses: Vec<(PeerId, Multiaddr)>, - peerset: substrate_peerset::Peerset, enable_mdns: bool, ) -> Self { let identify = { @@ -70,8 +69,6 @@ impl Behaviour { Identify::new(proto_version, user_agent, local_public_key.clone()) }; - let custom_protocols = CustomProto::new(protocol, peerset); - let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); for (peer_id, addr) in &known_addresses { kademlia.add_connected_address(peer_id, addr.clone()); @@ -84,8 +81,8 @@ impl Behaviour { let clock = Clock::new(); Behaviour { + user_protocol: UserBehaviourWrap(user_protocol), ping: Ping::new(PingConfig::new()), - custom_protocols, discovery: DiscoveryBehaviour { user_defined: known_addresses, kademlia, @@ -111,32 +108,11 @@ impl Behaviour { } } - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - #[inline] - pub fn send_custom_message(&mut self, target: &PeerId, data: TMessage) { - self.custom_protocols.send_packet(target, data) - } - /// Returns the list of nodes that we know exist in the network. pub fn known_peers(&self) -> impl Iterator { self.discovery.kademlia.kbuckets_entries() } - /// Returns true if we try to open protocols with the given peer. - pub fn is_enabled(&self, peer_id: &PeerId) -> bool { - self.custom_protocols.is_enabled(peer_id) - } - - /// Returns true if we have an open protocol with the given peer. - pub fn is_open(&self, peer_id: &PeerId) -> bool { - self.custom_protocols.is_open(peer_id) - } - /// Adds a hard-coded address for the given peer, that never expires. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { @@ -144,62 +120,22 @@ impl Behaviour { } } - /// Disconnects the custom protocols from a peer. - /// - /// The peer will still be able to use Kademlia or other protocols, but will get disconnected - /// after a few seconds of inactivity. - /// - /// This is asynchronous and does not instantly close the custom protocols. - /// Corresponding closing events will be generated once the closing actually happens. - /// - /// Has no effect if we're not connected to the `PeerId`. - #[inline] - pub fn drop_node(&mut self, peer_id: &PeerId) { - self.custom_protocols.disconnect_peer(peer_id) + /// Returns a shared reference to the user protocol. + pub fn user_protocol(&self) -> &TBehaviour { + &self.user_protocol.0 } - /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.custom_protocols.peerset_debug_info() + /// Returns a mutable reference to the user protocol. + pub fn user_protocol_mut(&mut self) -> &mut TBehaviour { + &mut self.user_protocol.0 } } /// Event that can be emitted by the behaviour. #[derive(Debug)] -pub enum BehaviourOut { - /// Opened a custom protocol with the remote. - CustomProtocolOpen { - /// Version of the protocol that has been opened. - version: u8, - /// Id of the node we have opened a connection with. - peer_id: PeerId, - /// Endpoint used for this custom protocol. - endpoint: ConnectedPoint, - }, - - /// Closed a custom protocol with the remote. - CustomProtocolClosed { - /// Id of the peer we were connected to. - peer_id: PeerId, - /// Reason why the substream closed, for diagnostic purposes. - reason: Cow<'static, str>, - }, - - /// Receives a message on a custom protocol substream. - CustomMessage { - /// Id of the peer the message came from. - peer_id: PeerId, - /// Message that has been received. - message: TMessage, - }, - - /// A substream with a remote is clogged. We should avoid sending more data to it if possible. - Clogged { - /// Id of the peer the message came from. - peer_id: PeerId, - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec, - }, +pub enum BehaviourOut { + /// Message from the user protocol. + UserProtocol(TBehaviourEv), /// We have obtained debug information from a peer. Identified { @@ -218,38 +154,21 @@ pub enum BehaviourOut { }, } -impl From> for BehaviourOut { - fn from(other: CustomProtoOut) -> BehaviourOut { - match other { - CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => { - BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint } - } - CustomProtoOut::CustomProtocolClosed { peer_id, reason } => { - BehaviourOut::CustomProtocolClosed { peer_id, reason } - } - CustomProtoOut::CustomMessage { peer_id, message } => { - BehaviourOut::CustomMessage { peer_id, message } - } - CustomProtoOut::Clogged { peer_id, messages } => { - BehaviourOut::Clogged { peer_id, messages } - } - } - } -} - -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: CustomProtoOut) { - self.events.push(event.into()); +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: UserEventWrap) { + self.events.push(BehaviourOut::UserProtocol(event.0)); } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess + for Behaviour + where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Identified { peer_id, mut info, .. } => { @@ -270,7 +189,7 @@ impl NetworkBehaviourEventProcess for Behav for addr in &info.listen_addrs { self.discovery.kademlia.add_connected_address(&peer_id, addr.clone()); } - self.custom_protocols.add_discovered_nodes(Some(peer_id.clone())); + self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone())); self.events.push(BehaviourOut::Identified { peer_id, info }); } IdentifyEvent::Error { .. } => {} @@ -282,12 +201,14 @@ impl NetworkBehaviourEventProcess for Behav } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess + for Behaviour + where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, out: KademliaOut) { match out { KademliaOut::Discovered { .. } => {} KademliaOut::KBucketAdded { peer_id, .. } => { - self.custom_protocols.add_discovered_nodes(Some(peer_id)); + self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); } KademliaOut::FindNodeResult { key, closer_peers } => { trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results", @@ -303,7 +224,7 @@ impl NetworkBehaviourEventProcess for Behavio } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: PingEvent) { match event { PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { @@ -316,19 +237,21 @@ impl NetworkBehaviourEventProcess for Behaviour } #[cfg(not(target_os = "unknown"))] -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for + Behaviour + where TBehaviour: DiscoveryNetBehaviour { fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { - self.custom_protocols.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); + self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); }, MdnsEvent::Expired(_) => {} } } } -impl Behaviour { - fn poll(&mut self) -> Async>> { +impl Behaviour { + fn poll(&mut self) -> Async>> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } @@ -337,6 +260,69 @@ impl Behaviour { } } +/// Because of limitations with the network behaviour custom derive and trait impl duplication, we +/// have to wrap the user protocol into a struct. +pub struct UserBehaviourWrap(TInner); +/// Event produced by `UserBehaviourWrap`. +pub struct UserEventWrap(TInner); +impl NetworkBehaviour for UserBehaviourWrap { + type ProtocolsHandler = TInner::ProtocolsHandler; + type OutEvent = UserEventWrap; + fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() } + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.0.addresses_of_peer(peer_id) + } + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.0.inject_connected(peer_id, endpoint) + } + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.0.inject_disconnected(peer_id, endpoint) + } + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent + ) { + self.0.inject_node_event(peer_id, event) + } + fn poll( + &mut self, + params: &mut PollParameters + ) -> Async::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> { + match self.0.poll(params) { + Async::NotReady => Async::NotReady, + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => + Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))), + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + } + } + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint) + } + fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { + self.0.inject_addr_reach_failure(peer_id, addr, error) + } + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.0.inject_dial_failure(peer_id) + } + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.0.inject_new_listen_addr(addr) + } + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.0.inject_expired_listen_addr(addr) + } + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.0.inject_new_external_addr(addr) + } +} + /// Implementation of `NetworkBehaviour` that discovers the nodes on the network. pub struct DiscoveryBehaviour { /// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index 590ba42d95..a4a4f4aff5 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use crate::DiscoveryNetBehaviour; use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use fnv::FnvHashMap; @@ -348,14 +349,6 @@ impl CustomProto { }); } - /// Indicates to the peerset that we have discovered new addresses for a given node. - pub fn add_discovered_nodes>(&mut self, peer_ids: I) { - self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { - debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); - peer_id - })); - } - /// Returns the state of the peerset manager, for debugging purposes. pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.peerset.debug_info() @@ -595,6 +588,15 @@ impl CustomProto { } } +impl DiscoveryNetBehaviour for CustomProto { + fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { + self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { + debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); + peer_id + })); + } +} + impl NetworkBehaviour for CustomProto where TSubstream: AsyncRead + AsyncWrite, diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 6e4b2e4784..6eb0748358 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -57,9 +57,9 @@ //! //! Please keep in mind that the state of the [`Service`] only updates itself in a way //! corresponding to the [`ServiceEvent`] that `poll` returns. -//! +//! //! Illustration: -//! +//! //! - You call [`Service::connected_peers`] to get the list of nodes we are connected to. //! - If you then call [`Service::connected_peers`] again, the returned list will always be the //! same, no matter what happened on the wire. @@ -119,6 +119,17 @@ use serde::{Deserialize, Serialize}; use slog_derive::SerdeValue; use std::{collections::{HashMap, HashSet}, error, fmt, time::Duration}; +/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. +pub trait DiscoveryNetBehaviour { + /// Notify the protocol that we have learned about the existence of nodes. + /// + /// Can (or most likely will) be called multiple times with the same `PeerId`s. + /// + /// Also note that there is no notification for expired nodes. The implementer must add a TTL + /// system, or remove nodes that will fail to reach. + fn add_discovered_nodes(&mut self, nodes: impl Iterator); +} + /// Name of a protocol, transmitted on the wire. Should be unique for each chain. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index af05d92092..18ccb5f139 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -18,7 +18,7 @@ use crate::{ behaviour::Behaviour, behaviour::BehaviourOut, transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer }; -use crate::custom_proto::{CustomMessage, RegisteredProtocol}; +use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol}; use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr}; use fnv::FnvHashMap; use futures::{prelude::*, Stream}; @@ -89,7 +89,8 @@ where TMessage: CustomMessage + Send + 'static { // Build the swarm. let (mut swarm, bandwidth) = { let user_agent = format!("{} ({})", config.client_version, config.node_name); - let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset, config.enable_mdns); + let proto = CustomProto::new(registered_custom, peerset); + let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns); let (transport, bandwidth) = transport::build_transport(local_identity); (Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth) }; @@ -157,7 +158,7 @@ pub enum ServiceEvent { /// Network service. Must be polled regularly in order for the networking to work. pub struct Service where TMessage: CustomMessage { /// Stream of events of the swarm. - swarm: Swarm, Behaviour>>, + swarm: Swarm, Behaviour>, CustomProtoOut, Substream>>, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc, @@ -194,8 +195,8 @@ where TMessage: CustomMessage + Send + 'static { endpoint: info.endpoint.clone().into(), version_string: info.client_version.clone(), latest_ping_time: info.latest_ping, - enabled: swarm.is_enabled(&peer_id), - open: swarm.is_open(&peer_id), + enabled: swarm.user_protocol().is_enabled(&peer_id), + open: swarm.user_protocol().is_open(&peer_id), known_addresses, }) }).collect() @@ -222,7 +223,7 @@ where TMessage: CustomMessage + Send + 'static { average_upload_per_sec: self.bandwidth.average_upload_per_sec(), connected_peers, not_connected_peers, - peerset: self.swarm.peerset_debug_info(), + peerset: self.swarm.user_protocol_mut().peerset_debug_info(), } } @@ -277,7 +278,7 @@ where TMessage: CustomMessage + Send + 'static { peer_id: &PeerId, message: TMessage ) { - self.swarm.send_custom_message(peer_id, message); + self.swarm.user_protocol_mut().send_packet(peer_id, message); } /// Disconnects a peer. @@ -288,7 +289,7 @@ where TMessage: CustomMessage + Send + 'static { if let Some(info) = self.nodes_info.get(peer_id) { debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})", peer_id, info.endpoint, info.client_version); - self.swarm.drop_node(peer_id); + self.swarm.user_protocol_mut().disconnect_peer(peer_id); } } @@ -310,7 +311,7 @@ where TMessage: CustomMessage + Send + 'static { fn poll_swarm(&mut self) -> Poll>, IoError> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => { + Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolOpen { peer_id, version, endpoint })))) => { self.nodes_info.insert(peer_id.clone(), NodeInfo { endpoint, client_version: None, @@ -323,7 +324,7 @@ where TMessage: CustomMessage + Send + 'static { debug_info, }))) } - Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, .. }))) => { + Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolClosed { peer_id, .. })))) => { let debug_info = self.peer_debug_info(&peer_id); self.nodes_info.remove(&peer_id); break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { @@ -331,13 +332,13 @@ where TMessage: CustomMessage + Send + 'static { debug_info, }))) } - Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => { + Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomMessage { peer_id, message })))) => { break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { peer_id, message, }))) } - Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => { + Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::Clogged { peer_id, messages })))) => { break Ok(Async::Ready(Some(ServiceEvent::Clogged { peer_id, messages,