mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 11:51:12 +00:00
Make the behaviour in libp2p generic (#2525)
* Make the behaviour in libp2p generic * Fix indentation * Fix bad merge
This commit is contained in:
committed by
Gavin Wood
parent
2724cdac33
commit
d974189e3c
@@ -14,10 +14,10 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol};
|
||||
use crate::DiscoveryNetBehaviour;
|
||||
use futures::prelude::*;
|
||||
use libp2p::NetworkBehaviour;
|
||||
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
|
||||
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey};
|
||||
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
|
||||
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
@@ -29,19 +29,19 @@ use libp2p::mdns::{Mdns, MdnsEvent};
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
|
||||
use log::{debug, info, trace, warn};
|
||||
use std::{borrow::Cow, cmp, time::Duration};
|
||||
use std::{cmp, iter, time::Duration};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::{Delay, clock::Clock};
|
||||
use void;
|
||||
|
||||
/// General behaviour of the network.
|
||||
#[derive(NetworkBehaviour)]
|
||||
#[behaviour(out_event = "BehaviourOut<TMessage>", poll_method = "poll")]
|
||||
pub struct Behaviour<TMessage, TSubstream> {
|
||||
#[behaviour(out_event = "BehaviourOut<TBehaviourEv>", poll_method = "poll")]
|
||||
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
/// Main protocol that handles everything except the discovery and the technicalities.
|
||||
user_protocol: UserBehaviourWrap<TBehaviour>,
|
||||
/// Periodically ping nodes, and close the connection if it's unresponsive.
|
||||
ping: Ping<TSubstream>,
|
||||
/// Custom protocols (dot, bbq, sub, etc.).
|
||||
custom_protocols: CustomProto<TMessage, TSubstream>,
|
||||
/// Discovers nodes of the network. Defined below.
|
||||
discovery: DiscoveryBehaviour<TSubstream>,
|
||||
/// Periodically identifies the remote and responds to incoming requests.
|
||||
@@ -52,17 +52,16 @@ pub struct Behaviour<TMessage, TSubstream> {
|
||||
|
||||
/// Queue of events to produce for the outside.
|
||||
#[behaviour(ignore)]
|
||||
events: Vec<BehaviourOut<TMessage>>,
|
||||
events: Vec<BehaviourOut<TBehaviourEv>>,
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
/// Builds a new `Behaviour`.
|
||||
pub fn new(
|
||||
user_protocol: TBehaviour,
|
||||
user_agent: String,
|
||||
local_public_key: PublicKey,
|
||||
protocol: RegisteredProtocol<TMessage>,
|
||||
known_addresses: Vec<(PeerId, Multiaddr)>,
|
||||
peerset: substrate_peerset::Peerset,
|
||||
enable_mdns: bool,
|
||||
) -> Self {
|
||||
let identify = {
|
||||
@@ -70,8 +69,6 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
Identify::new(proto_version, user_agent, local_public_key.clone())
|
||||
};
|
||||
|
||||
let custom_protocols = CustomProto::new(protocol, peerset);
|
||||
|
||||
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
|
||||
for (peer_id, addr) in &known_addresses {
|
||||
kademlia.add_connected_address(peer_id, addr.clone());
|
||||
@@ -84,8 +81,8 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
|
||||
let clock = Clock::new();
|
||||
Behaviour {
|
||||
user_protocol: UserBehaviourWrap(user_protocol),
|
||||
ping: Ping::new(PingConfig::new()),
|
||||
custom_protocols,
|
||||
discovery: DiscoveryBehaviour {
|
||||
user_defined: known_addresses,
|
||||
kademlia,
|
||||
@@ -111,32 +108,11 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a message to a peer.
|
||||
///
|
||||
/// Has no effect if the custom protocol is not open with the given peer.
|
||||
///
|
||||
/// Also note that even we have a valid open substream, it may in fact be already closed
|
||||
/// without us knowing, in which case the packet will not be received.
|
||||
#[inline]
|
||||
pub fn send_custom_message(&mut self, target: &PeerId, data: TMessage) {
|
||||
self.custom_protocols.send_packet(target, data)
|
||||
}
|
||||
|
||||
/// Returns the list of nodes that we know exist in the network.
|
||||
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.discovery.kademlia.kbuckets_entries()
|
||||
}
|
||||
|
||||
/// Returns true if we try to open protocols with the given peer.
|
||||
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
|
||||
self.custom_protocols.is_enabled(peer_id)
|
||||
}
|
||||
|
||||
/// Returns true if we have an open protocol with the given peer.
|
||||
pub fn is_open(&self, peer_id: &PeerId) -> bool {
|
||||
self.custom_protocols.is_open(peer_id)
|
||||
}
|
||||
|
||||
/// Adds a hard-coded address for the given peer, that never expires.
|
||||
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
|
||||
@@ -144,62 +120,22 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Disconnects the custom protocols from a peer.
|
||||
///
|
||||
/// The peer will still be able to use Kademlia or other protocols, but will get disconnected
|
||||
/// after a few seconds of inactivity.
|
||||
///
|
||||
/// This is asynchronous and does not instantly close the custom protocols.
|
||||
/// Corresponding closing events will be generated once the closing actually happens.
|
||||
///
|
||||
/// Has no effect if we're not connected to the `PeerId`.
|
||||
#[inline]
|
||||
pub fn drop_node(&mut self, peer_id: &PeerId) {
|
||||
self.custom_protocols.disconnect_peer(peer_id)
|
||||
/// Returns a shared reference to the user protocol.
|
||||
pub fn user_protocol(&self) -> &TBehaviour {
|
||||
&self.user_protocol.0
|
||||
}
|
||||
|
||||
/// Returns the state of the peerset manager, for debugging purposes.
|
||||
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
|
||||
self.custom_protocols.peerset_debug_info()
|
||||
/// Returns a mutable reference to the user protocol.
|
||||
pub fn user_protocol_mut(&mut self) -> &mut TBehaviour {
|
||||
&mut self.user_protocol.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Event that can be emitted by the behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum BehaviourOut<TMessage> {
|
||||
/// Opened a custom protocol with the remote.
|
||||
CustomProtocolOpen {
|
||||
/// Version of the protocol that has been opened.
|
||||
version: u8,
|
||||
/// Id of the node we have opened a connection with.
|
||||
peer_id: PeerId,
|
||||
/// Endpoint used for this custom protocol.
|
||||
endpoint: ConnectedPoint,
|
||||
},
|
||||
|
||||
/// Closed a custom protocol with the remote.
|
||||
CustomProtocolClosed {
|
||||
/// Id of the peer we were connected to.
|
||||
peer_id: PeerId,
|
||||
/// Reason why the substream closed, for diagnostic purposes.
|
||||
reason: Cow<'static, str>,
|
||||
},
|
||||
|
||||
/// Receives a message on a custom protocol substream.
|
||||
CustomMessage {
|
||||
/// Id of the peer the message came from.
|
||||
peer_id: PeerId,
|
||||
/// Message that has been received.
|
||||
message: TMessage,
|
||||
},
|
||||
|
||||
/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
|
||||
Clogged {
|
||||
/// Id of the peer the message came from.
|
||||
peer_id: PeerId,
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<TMessage>,
|
||||
},
|
||||
pub enum BehaviourOut<TBehaviourEv> {
|
||||
/// Message from the user protocol.
|
||||
UserProtocol(TBehaviourEv),
|
||||
|
||||
/// We have obtained debug information from a peer.
|
||||
Identified {
|
||||
@@ -218,38 +154,21 @@ pub enum BehaviourOut<TMessage> {
|
||||
},
|
||||
}
|
||||
|
||||
impl<TMessage> From<CustomProtoOut<TMessage>> for BehaviourOut<TMessage> {
|
||||
fn from(other: CustomProtoOut<TMessage>) -> BehaviourOut<TMessage> {
|
||||
match other {
|
||||
CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => {
|
||||
BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint }
|
||||
}
|
||||
CustomProtoOut::CustomProtocolClosed { peer_id, reason } => {
|
||||
BehaviourOut::CustomProtocolClosed { peer_id, reason }
|
||||
}
|
||||
CustomProtoOut::CustomMessage { peer_id, message } => {
|
||||
BehaviourOut::CustomMessage { peer_id, message }
|
||||
}
|
||||
CustomProtoOut::Clogged { peer_id, messages } => {
|
||||
BehaviourOut::Clogged { peer_id, messages }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn inject_event(&mut self, event: void::Void) {
|
||||
void::unreachable(event)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtoOut<TMessage>> for Behaviour<TMessage, TSubstream> {
|
||||
fn inject_event(&mut self, event: CustomProtoOut<TMessage>) {
|
||||
self.events.push(event.into());
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
|
||||
self.events.push(BehaviourOut::UserProtocol(event.0));
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent>
|
||||
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
||||
where TBehaviour: DiscoveryNetBehaviour {
|
||||
fn inject_event(&mut self, event: IdentifyEvent) {
|
||||
match event {
|
||||
IdentifyEvent::Identified { peer_id, mut info, .. } => {
|
||||
@@ -270,7 +189,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
|
||||
for addr in &info.listen_addrs {
|
||||
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
|
||||
}
|
||||
self.custom_protocols.add_discovered_nodes(Some(peer_id.clone()));
|
||||
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
|
||||
self.events.push(BehaviourOut::Identified { peer_id, info });
|
||||
}
|
||||
IdentifyEvent::Error { .. } => {}
|
||||
@@ -282,12 +201,14 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<KademliaOut>
|
||||
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
||||
where TBehaviour: DiscoveryNetBehaviour {
|
||||
fn inject_event(&mut self, out: KademliaOut) {
|
||||
match out {
|
||||
KademliaOut::Discovered { .. } => {}
|
||||
KademliaOut::KBucketAdded { peer_id, .. } => {
|
||||
self.custom_protocols.add_discovered_nodes(Some(peer_id));
|
||||
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id));
|
||||
}
|
||||
KademliaOut::FindNodeResult { key, closer_peers } => {
|
||||
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
|
||||
@@ -303,7 +224,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behavio
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn inject_event(&mut self, event: PingEvent) {
|
||||
match event {
|
||||
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
|
||||
@@ -316,19 +237,21 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for Behaviour<TMessage, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
|
||||
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
|
||||
where TBehaviour: DiscoveryNetBehaviour {
|
||||
fn inject_event(&mut self, event: MdnsEvent) {
|
||||
match event {
|
||||
MdnsEvent::Discovered(list) => {
|
||||
self.custom_protocols.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
|
||||
self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
|
||||
},
|
||||
MdnsEvent::Expired(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TMessage>>> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TBehaviourEv>>> {
|
||||
if !self.events.is_empty() {
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
|
||||
}
|
||||
@@ -337,6 +260,69 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Because of limitations with the network behaviour custom derive and trait impl duplication, we
|
||||
/// have to wrap the user protocol into a struct.
|
||||
pub struct UserBehaviourWrap<TInner>(TInner);
|
||||
/// Event produced by `UserBehaviourWrap`.
|
||||
pub struct UserEventWrap<TInner>(TInner);
|
||||
impl<TInner: NetworkBehaviour> NetworkBehaviour for UserBehaviourWrap<TInner> {
|
||||
type ProtocolsHandler = TInner::ProtocolsHandler;
|
||||
type OutEvent = UserEventWrap<TInner::OutEvent>;
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() }
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
self.0.addresses_of_peer(peer_id)
|
||||
}
|
||||
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
|
||||
self.0.inject_connected(peer_id, endpoint)
|
||||
}
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
|
||||
self.0.inject_disconnected(peer_id, endpoint)
|
||||
}
|
||||
fn inject_node_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
||||
) {
|
||||
self.0.inject_node_event(peer_id, event)
|
||||
}
|
||||
fn poll(
|
||||
&mut self,
|
||||
params: &mut PollParameters
|
||||
) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
|
||||
match self.0.poll(params) {
|
||||
Async::NotReady => Async::NotReady,
|
||||
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
|
||||
Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))),
|
||||
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||
Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
|
||||
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||
}
|
||||
}
|
||||
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||
self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint)
|
||||
}
|
||||
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
|
||||
self.0.inject_addr_reach_failure(peer_id, addr, error)
|
||||
}
|
||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||
self.0.inject_dial_failure(peer_id)
|
||||
}
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.0.inject_new_listen_addr(addr)
|
||||
}
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.0.inject_expired_listen_addr(addr)
|
||||
}
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
self.0.inject_new_external_addr(addr)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
|
||||
pub struct DiscoveryBehaviour<TSubstream> {
|
||||
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::DiscoveryNetBehaviour;
|
||||
use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
|
||||
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
|
||||
use fnv::FnvHashMap;
|
||||
@@ -348,14 +349,6 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
||||
});
|
||||
}
|
||||
|
||||
/// Indicates to the peerset that we have discovered new addresses for a given node.
|
||||
pub fn add_discovered_nodes<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
|
||||
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
|
||||
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
|
||||
peer_id
|
||||
}));
|
||||
}
|
||||
|
||||
/// Returns the state of the peerset manager, for debugging purposes.
|
||||
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
|
||||
self.peerset.debug_info()
|
||||
@@ -595,6 +588,15 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> DiscoveryNetBehaviour for CustomProto<TMessage, TSubstream> {
|
||||
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
|
||||
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
|
||||
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
|
||||
peer_id
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> NetworkBehaviour for CustomProto<TMessage, TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
|
||||
@@ -57,9 +57,9 @@
|
||||
//!
|
||||
//! Please keep in mind that the state of the [`Service`] only updates itself in a way
|
||||
//! corresponding to the [`ServiceEvent`] that `poll` returns.
|
||||
//!
|
||||
//!
|
||||
//! Illustration:
|
||||
//!
|
||||
//!
|
||||
//! - You call [`Service::connected_peers`] to get the list of nodes we are connected to.
|
||||
//! - If you then call [`Service::connected_peers`] again, the returned list will always be the
|
||||
//! same, no matter what happened on the wire.
|
||||
@@ -119,6 +119,17 @@ use serde::{Deserialize, Serialize};
|
||||
use slog_derive::SerdeValue;
|
||||
use std::{collections::{HashMap, HashSet}, error, fmt, time::Duration};
|
||||
|
||||
/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes.
|
||||
pub trait DiscoveryNetBehaviour {
|
||||
/// Notify the protocol that we have learned about the existence of nodes.
|
||||
///
|
||||
/// Can (or most likely will) be called multiple times with the same `PeerId`s.
|
||||
///
|
||||
/// Also note that there is no notification for expired nodes. The implementer must add a TTL
|
||||
/// system, or remove nodes that will fail to reach.
|
||||
fn add_discovered_nodes(&mut self, nodes: impl Iterator<Item = PeerId>);
|
||||
}
|
||||
|
||||
/// Name of a protocol, transmitted on the wire. Should be unique for each chain.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>);
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
behaviour::Behaviour, behaviour::BehaviourOut,
|
||||
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
|
||||
};
|
||||
use crate::custom_proto::{CustomMessage, RegisteredProtocol};
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol};
|
||||
use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{prelude::*, Stream};
|
||||
@@ -89,7 +89,8 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
// Build the swarm.
|
||||
let (mut swarm, bandwidth) = {
|
||||
let user_agent = format!("{} ({})", config.client_version, config.node_name);
|
||||
let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset, config.enable_mdns);
|
||||
let proto = CustomProto::new(registered_custom, peerset);
|
||||
let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns);
|
||||
let (transport, bandwidth) = transport::build_transport(local_identity);
|
||||
(Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth)
|
||||
};
|
||||
@@ -157,7 +158,7 @@ pub enum ServiceEvent<TMessage> {
|
||||
/// Network service. Must be polled regularly in order for the networking to work.
|
||||
pub struct Service<TMessage> where TMessage: CustomMessage {
|
||||
/// Stream of events of the swarm.
|
||||
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<TMessage, Substream<StreamMuxerBox>>>,
|
||||
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<CustomProto<TMessage, Substream<StreamMuxerBox>>, CustomProtoOut<TMessage>, Substream<StreamMuxerBox>>>,
|
||||
|
||||
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
||||
bandwidth: Arc<transport::BandwidthSinks>,
|
||||
@@ -194,8 +195,8 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
endpoint: info.endpoint.clone().into(),
|
||||
version_string: info.client_version.clone(),
|
||||
latest_ping_time: info.latest_ping,
|
||||
enabled: swarm.is_enabled(&peer_id),
|
||||
open: swarm.is_open(&peer_id),
|
||||
enabled: swarm.user_protocol().is_enabled(&peer_id),
|
||||
open: swarm.user_protocol().is_open(&peer_id),
|
||||
known_addresses,
|
||||
})
|
||||
}).collect()
|
||||
@@ -222,7 +223,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
|
||||
connected_peers,
|
||||
not_connected_peers,
|
||||
peerset: self.swarm.peerset_debug_info(),
|
||||
peerset: self.swarm.user_protocol_mut().peerset_debug_info(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -277,7 +278,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
peer_id: &PeerId,
|
||||
message: TMessage
|
||||
) {
|
||||
self.swarm.send_custom_message(peer_id, message);
|
||||
self.swarm.user_protocol_mut().send_packet(peer_id, message);
|
||||
}
|
||||
|
||||
/// Disconnects a peer.
|
||||
@@ -288,7 +289,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
if let Some(info) = self.nodes_info.get(peer_id) {
|
||||
debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})",
|
||||
peer_id, info.endpoint, info.client_version);
|
||||
self.swarm.drop_node(peer_id);
|
||||
self.swarm.user_protocol_mut().disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,7 +311,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
|
||||
loop {
|
||||
match self.swarm.poll() {
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => {
|
||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolOpen { peer_id, version, endpoint })))) => {
|
||||
self.nodes_info.insert(peer_id.clone(), NodeInfo {
|
||||
endpoint,
|
||||
client_version: None,
|
||||
@@ -323,7 +324,7 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
debug_info,
|
||||
})))
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, .. }))) => {
|
||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomProtocolClosed { peer_id, .. })))) => {
|
||||
let debug_info = self.peer_debug_info(&peer_id);
|
||||
self.nodes_info.remove(&peer_id);
|
||||
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
|
||||
@@ -331,13 +332,13 @@ where TMessage: CustomMessage + Send + 'static {
|
||||
debug_info,
|
||||
})))
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => {
|
||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::CustomMessage { peer_id, message })))) => {
|
||||
break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
|
||||
peer_id,
|
||||
message,
|
||||
})))
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => {
|
||||
Ok(Async::Ready(Some(BehaviourOut::UserProtocol(CustomProtoOut::Clogged { peer_id, messages })))) => {
|
||||
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
|
||||
peer_id,
|
||||
messages,
|
||||
|
||||
Reference in New Issue
Block a user