mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 16:57:58 +00:00
Use a Kademlia instance per ProtocolId. (#5045)
This commit is contained in:
@@ -16,7 +16,7 @@
|
||||
|
||||
use crate::{
|
||||
config::Role,
|
||||
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut,
|
||||
debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
|
||||
Event, ObservedRole, DhtEvent, ExHashT,
|
||||
};
|
||||
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
|
||||
@@ -67,28 +67,19 @@ pub enum BehaviourOut<B: BlockT> {
|
||||
|
||||
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
|
||||
/// Builds a new `Behaviour`.
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
substrate: Protocol<B, H>,
|
||||
role: Role,
|
||||
user_agent: String,
|
||||
local_public_key: PublicKey,
|
||||
known_addresses: Vec<(PeerId, Multiaddr)>,
|
||||
enable_mdns: bool,
|
||||
allow_private_ipv4: bool,
|
||||
discovery_only_if_under_num: u64,
|
||||
block_requests: protocol::BlockRequests<B>,
|
||||
light_client_handler: protocol::LightClientHandler<B>,
|
||||
disco_config: DiscoveryConfig,
|
||||
) -> Self {
|
||||
Behaviour {
|
||||
substrate,
|
||||
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
|
||||
discovery: DiscoveryBehaviour::new(
|
||||
local_public_key,
|
||||
known_addresses,
|
||||
enable_mdns,
|
||||
allow_private_ipv4,
|
||||
discovery_only_if_under_num,
|
||||
).await,
|
||||
discovery: disco_config.finish(),
|
||||
block_requests,
|
||||
light_client_handler,
|
||||
events: Vec::new(),
|
||||
|
||||
@@ -45,30 +45,162 @@
|
||||
//! of a node's address, you must call `add_self_reported_address`.
|
||||
//!
|
||||
|
||||
use crate::config::ProtocolId;
|
||||
use futures::prelude::*;
|
||||
use futures_timer::Delay;
|
||||
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
|
||||
use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record};
|
||||
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
|
||||
use libp2p::swarm::protocols_handler::multi::MultiHandler;
|
||||
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
|
||||
use libp2p::kad::GetClosestPeersError;
|
||||
use libp2p::kad::handler::KademliaHandler;
|
||||
use libp2p::kad::QueryId;
|
||||
use libp2p::kad::record::{self, store::MemoryStore};
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
use libp2p::{swarm::toggle::Toggle};
|
||||
use libp2p::swarm::toggle::Toggle;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
use libp2p::mdns::{Mdns, MdnsEvent};
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use log::{debug, info, trace, warn, error};
|
||||
use std::{cmp, collections::VecDeque, io, time::Duration};
|
||||
use std::{cmp, collections::{HashMap, HashSet, VecDeque}, io, time::Duration};
|
||||
use std::task::{Context, Poll};
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
|
||||
/// `DiscoveryBehaviour` configuration.
|
||||
pub struct DiscoveryConfig {
|
||||
local_peer_id: PeerId,
|
||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||
allow_private_ipv4: bool,
|
||||
discovery_only_if_under_num: u64,
|
||||
enable_mdns: bool,
|
||||
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>
|
||||
}
|
||||
|
||||
impl DiscoveryConfig {
|
||||
/// Crate a default configuration with the given public key.
|
||||
pub fn new(local_public_key: PublicKey) -> Self {
|
||||
let mut this = DiscoveryConfig {
|
||||
local_peer_id: local_public_key.into_peer_id(),
|
||||
user_defined: Vec::new(),
|
||||
allow_private_ipv4: true,
|
||||
discovery_only_if_under_num: std::u64::MAX,
|
||||
enable_mdns: false,
|
||||
kademlias: HashMap::new()
|
||||
};
|
||||
|
||||
// Temporary hack to retain backwards compatibility.
|
||||
// We should eventually remove the special handling of DEFAULT_PROTO_NAME.
|
||||
let proto_id = ProtocolId::from(libp2p::kad::protocol::DEFAULT_PROTO_NAME);
|
||||
let proto_name = Vec::from(proto_id.as_bytes());
|
||||
this.add_kademlia(proto_id, proto_name);
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
/// Set the number of active connections at which we pause discovery.
|
||||
pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
|
||||
self.discovery_only_if_under_num = limit;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
|
||||
pub fn with_user_defined<I>(&mut self, user_defined: I) -> &mut Self
|
||||
where
|
||||
I: IntoIterator<Item = (PeerId, Multiaddr)>
|
||||
{
|
||||
for (peer_id, addr) in user_defined {
|
||||
for kad in self.kademlias.values_mut() {
|
||||
kad.add_address(&peer_id, addr.clone())
|
||||
}
|
||||
self.user_defined.push((peer_id, addr))
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Should private IPv4 addresses be reported?
|
||||
pub fn allow_private_ipv4(&mut self, value: bool) -> &mut Self {
|
||||
self.allow_private_ipv4 = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Should MDNS discovery be supported?
|
||||
pub fn with_mdns(&mut self, value: bool) -> &mut Self {
|
||||
if value && cfg!(target_os = "unknown") {
|
||||
log::warn!(target: "sub-libp2p", "mDNS is not available on this platform")
|
||||
}
|
||||
self.enable_mdns = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Add discovery via Kademlia for the given protocol.
|
||||
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
|
||||
// NB: If this protocol name derivation is changed, check if
|
||||
// `DiscoveryBehaviour::new_handler` is still correct.
|
||||
let proto_name = {
|
||||
let mut v = vec![b'/'];
|
||||
v.extend_from_slice(p.as_bytes());
|
||||
v.extend_from_slice(b"/kad");
|
||||
v
|
||||
};
|
||||
|
||||
self.add_kademlia(p, proto_name);
|
||||
self
|
||||
}
|
||||
|
||||
fn add_kademlia(&mut self, id: ProtocolId, proto_name: Vec<u8>) {
|
||||
if self.kademlias.contains_key(&id) {
|
||||
warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id);
|
||||
return
|
||||
}
|
||||
|
||||
let mut config = KademliaConfig::default();
|
||||
config.set_protocol_name(proto_name);
|
||||
|
||||
let store = MemoryStore::new(self.local_peer_id.clone());
|
||||
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
|
||||
|
||||
for (peer_id, addr) in &self.user_defined {
|
||||
kad.add_address(peer_id, addr.clone());
|
||||
}
|
||||
|
||||
self.kademlias.insert(id, kad);
|
||||
}
|
||||
|
||||
/// Create a `DiscoveryBehaviour` from this config.
|
||||
pub fn finish(self) -> DiscoveryBehaviour {
|
||||
DiscoveryBehaviour {
|
||||
user_defined: self.user_defined,
|
||||
kademlias: self.kademlias,
|
||||
next_kad_random_query: Delay::new(Duration::new(0, 0)),
|
||||
duration_to_next_kad: Duration::from_secs(1),
|
||||
discoveries: VecDeque::new(),
|
||||
local_peer_id: self.local_peer_id,
|
||||
num_connections: 0,
|
||||
allow_private_ipv4: self.allow_private_ipv4,
|
||||
discovery_only_if_under_num: self.discovery_only_if_under_num,
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mdns: if self.enable_mdns {
|
||||
match Mdns::new() {
|
||||
Ok(mdns) => Some(mdns).into(),
|
||||
Err(err) => {
|
||||
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
|
||||
None.into()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None.into()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
|
||||
pub struct DiscoveryBehaviour {
|
||||
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
|
||||
/// reserved nodes.
|
||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||
/// Kademlia requests and answers.
|
||||
kademlia: Kademlia<MemoryStore>,
|
||||
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
|
||||
/// Discovers nodes on the local network.
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mdns: Toggle<Mdns>,
|
||||
@@ -90,56 +222,13 @@ pub struct DiscoveryBehaviour {
|
||||
}
|
||||
|
||||
impl DiscoveryBehaviour {
|
||||
/// Builds a new `DiscoveryBehaviour`.
|
||||
///
|
||||
/// `user_defined` is a list of known address for nodes that never expire.
|
||||
pub async fn new(
|
||||
local_public_key: PublicKey,
|
||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||
enable_mdns: bool,
|
||||
allow_private_ipv4: bool,
|
||||
discovery_only_if_under_num: u64,
|
||||
) -> Self {
|
||||
if enable_mdns {
|
||||
#[cfg(target_os = "unknown")]
|
||||
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
|
||||
}
|
||||
|
||||
let local_id = local_public_key.clone().into_peer_id();
|
||||
let store = MemoryStore::new(local_id.clone());
|
||||
let mut kademlia = Kademlia::new(local_id.clone(), store);
|
||||
for (peer_id, addr) in &user_defined {
|
||||
kademlia.add_address(peer_id, addr.clone());
|
||||
}
|
||||
|
||||
DiscoveryBehaviour {
|
||||
user_defined,
|
||||
kademlia,
|
||||
next_kad_random_query: Delay::new(Duration::new(0, 0)),
|
||||
duration_to_next_kad: Duration::from_secs(1),
|
||||
discoveries: VecDeque::new(),
|
||||
local_peer_id: local_public_key.into_peer_id(),
|
||||
num_connections: 0,
|
||||
allow_private_ipv4,
|
||||
discovery_only_if_under_num,
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mdns: if enable_mdns {
|
||||
match Mdns::new() {
|
||||
Ok(mdns) => Some(mdns).into(),
|
||||
Err(err) => {
|
||||
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
|
||||
None.into()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None.into()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the list of nodes that we know exist in the network.
|
||||
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
|
||||
self.kademlia.kbuckets_entries()
|
||||
let mut set = HashSet::new();
|
||||
for p in self.kademlias.values_mut().map(|k| k.kbuckets_entries()).flatten() {
|
||||
set.insert(p);
|
||||
}
|
||||
set.into_iter()
|
||||
}
|
||||
|
||||
/// Adds a hard-coded address for the given peer, that never expires.
|
||||
@@ -149,7 +238,9 @@ impl DiscoveryBehaviour {
|
||||
/// If we didn't know this address before, also generates a `Discovered` event.
|
||||
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
|
||||
self.kademlia.add_address(&peer_id, addr.clone());
|
||||
for k in self.kademlias.values_mut() {
|
||||
k.add_address(&peer_id, addr.clone())
|
||||
}
|
||||
self.discoveries.push_back(peer_id.clone());
|
||||
self.user_defined.push((peer_id, addr));
|
||||
}
|
||||
@@ -160,14 +251,18 @@ impl DiscoveryBehaviour {
|
||||
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
|
||||
/// not properly work.
|
||||
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
|
||||
self.kademlia.add_address(peer_id, addr);
|
||||
for k in self.kademlias.values_mut() {
|
||||
k.add_address(peer_id, addr.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Start fetching a record from the DHT.
|
||||
///
|
||||
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
|
||||
pub fn get_value(&mut self, key: &record::Key) {
|
||||
self.kademlia.get_record(key, Quorum::One)
|
||||
for k in self.kademlias.values_mut() {
|
||||
k.get_record(key, Quorum::One)
|
||||
}
|
||||
}
|
||||
|
||||
/// Start putting a record into the DHT. Other nodes can later fetch that value with
|
||||
@@ -175,12 +270,14 @@ impl DiscoveryBehaviour {
|
||||
///
|
||||
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
|
||||
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
|
||||
self.kademlia.put_record(Record::new(key, value), Quorum::All);
|
||||
for k in self.kademlias.values_mut() {
|
||||
k.put_record(Record::new(key.clone(), value.clone()), Quorum::All)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of nodes that are in the Kademlia k-buckets.
|
||||
pub fn num_kbuckets_entries(&mut self) -> usize {
|
||||
self.kademlia.kbuckets_entries().count()
|
||||
self.known_peers().count()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,11 +312,19 @@ pub enum DiscoveryOut {
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
type ProtocolsHandler = <Kademlia<MemoryStore> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type ProtocolsHandler = MultiHandler<ProtocolId, KademliaHandler<QueryId>>;
|
||||
type OutEvent = DiscoveryOut;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
NetworkBehaviour::new_handler(&mut self.kademlia)
|
||||
let iter = self.kademlias.iter_mut()
|
||||
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)));
|
||||
|
||||
MultiHandler::try_from_iter(iter)
|
||||
.expect("There can be at most one handler per `ProtocolId` and \
|
||||
protocol names contain the `ProtocolId` so no two protocol \
|
||||
names in `self.kademlias` can be equal which is the only error \
|
||||
`try_from_iter` can return, therefore this call is guaranteed \
|
||||
to succeed; qed")
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
@@ -228,7 +333,11 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
{
|
||||
let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id);
|
||||
let mut list_to_filter = Vec::new();
|
||||
for k in self.kademlias.values_mut() {
|
||||
list_to_filter.extend(k.addresses_of_peer(peer_id))
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
|
||||
|
||||
@@ -248,13 +357,23 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
}
|
||||
|
||||
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
|
||||
|
||||
if list.is_empty() {
|
||||
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) {
|
||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \
|
||||
and no address was found", peer_id);
|
||||
let mut has_entry = false;
|
||||
for k in self.kademlias.values_mut() {
|
||||
if k.kbuckets_entries().any(|p| p == peer_id) {
|
||||
has_entry = true;
|
||||
break
|
||||
}
|
||||
}
|
||||
if has_entry {
|
||||
debug!(target: "sub-libp2p",
|
||||
"Requested dialing to {:?} (peer in k-buckets), and no address was found",
|
||||
peer_id);
|
||||
} else {
|
||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \
|
||||
and no address was found", peer_id);
|
||||
debug!(target: "sub-libp2p",
|
||||
"Requested dialing to {:?} (peer not in k-buckets), and no address was found",
|
||||
peer_id);
|
||||
}
|
||||
}
|
||||
list
|
||||
@@ -262,20 +381,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
|
||||
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||
self.num_connections += 1;
|
||||
NetworkBehaviour::inject_connection_established(&mut self.kademlia, peer_id, conn, endpoint)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_connected(k, peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||
self.num_connections -= 1;
|
||||
NetworkBehaviour::inject_connection_closed(&mut self.kademlia, peer_id, conn, endpoint)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_disconnected(k, peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_addr_reach_failure(
|
||||
@@ -284,45 +411,65 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
addr: &Multiaddr,
|
||||
error: &dyn std::error::Error
|
||||
) {
|
||||
NetworkBehaviour::inject_addr_reach_failure(&mut self.kademlia, peer_id, addr, error)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_addr_reach_failure(k, peer_id, addr, error)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
connection: ConnectionId,
|
||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
(pid, event): <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
) {
|
||||
NetworkBehaviour::inject_event(&mut self.kademlia, peer_id, connection, event)
|
||||
if let Some(kad) = self.kademlias.get_mut(&pid) {
|
||||
return kad.inject_event(peer_id, connection, event)
|
||||
}
|
||||
log::error!(target: "sub-libp2p",
|
||||
"inject_node_event: no kademlia instance registered for protocol {:?}",
|
||||
pid)
|
||||
}
|
||||
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
let new_addr = addr.clone()
|
||||
.with(Protocol::P2p(self.local_peer_id.clone().into()));
|
||||
info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr);
|
||||
NetworkBehaviour::inject_new_external_addr(&mut self.kademlia, addr)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_new_external_addr(k, addr)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
info!(target: "sub-libp2p", "No longer listening on {}", addr);
|
||||
NetworkBehaviour::inject_expired_listen_addr(&mut self.kademlia, addr)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_expired_listen_addr(k, addr)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||
NetworkBehaviour::inject_dial_failure(&mut self.kademlia, peer_id)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_dial_failure(k, peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
NetworkBehaviour::inject_new_listen_addr(&mut self.kademlia, addr)
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_new_listen_addr(k, addr)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
||||
error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err);
|
||||
NetworkBehaviour::inject_listener_error(&mut self.kademlia, id, err);
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_listener_error(k, id, err)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
|
||||
NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id, reason);
|
||||
error!(target: "sub-libp2p", "Libp2p listener {:?} closed", id);
|
||||
for k in self.kademlias.values_mut() {
|
||||
NetworkBehaviour::inject_listener_closed(k, id, reason)
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(
|
||||
@@ -348,10 +495,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
debug!(target: "sub-libp2p",
|
||||
"Libp2p <= Starting random Kademlia request for {:?}",
|
||||
random_peer_id);
|
||||
|
||||
self.kademlia.get_closest_peers(random_peer_id);
|
||||
for k in self.kademlias.values_mut() {
|
||||
k.get_closest_peers(random_peer_id.clone())
|
||||
}
|
||||
true
|
||||
|
||||
} else {
|
||||
debug!(
|
||||
target: "sub-libp2p",
|
||||
@@ -373,96 +520,102 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
}
|
||||
}
|
||||
|
||||
// Poll Kademlia.
|
||||
while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
|
||||
match ev {
|
||||
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
|
||||
KademliaEvent::UnroutablePeer { peer, .. } => {
|
||||
let ev = DiscoveryOut::UnroutablePeer(peer);
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::RoutingUpdated { peer, .. } => {
|
||||
let ev = DiscoveryOut::Discovered(peer);
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::GetClosestPeersResult(res) => {
|
||||
match res {
|
||||
Err(GetClosestPeersError::Timeout { key, peers }) => {
|
||||
debug!(target: "sub-libp2p",
|
||||
"Libp2p => Query for {:?} timed out with {} results",
|
||||
HexDisplay::from(&key), peers.len());
|
||||
},
|
||||
Ok(ok) => {
|
||||
trace!(target: "sub-libp2p",
|
||||
"Libp2p => Query for {:?} yielded {:?} results",
|
||||
HexDisplay::from(&ok.key), ok.peers.len());
|
||||
if ok.peers.is_empty() && self.num_connections != 0 {
|
||||
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||
results");
|
||||
// Poll Kademlias.
|
||||
for (pid, kademlia) in &mut self.kademlias {
|
||||
while let Poll::Ready(ev) = kademlia.poll(cx, params) {
|
||||
match ev {
|
||||
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
|
||||
KademliaEvent::UnroutablePeer { peer, .. } => {
|
||||
let ev = DiscoveryOut::UnroutablePeer(peer);
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::RoutingUpdated { peer, .. } => {
|
||||
let ev = DiscoveryOut::Discovered(peer);
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::GetClosestPeersResult(res) => {
|
||||
match res {
|
||||
Err(GetClosestPeersError::Timeout { key, peers }) => {
|
||||
debug!(target: "sub-libp2p",
|
||||
"Libp2p => Query for {:?} timed out with {} results",
|
||||
HexDisplay::from(&key), peers.len());
|
||||
},
|
||||
Ok(ok) => {
|
||||
trace!(target: "sub-libp2p",
|
||||
"Libp2p => Query for {:?} yielded {:?} results",
|
||||
HexDisplay::from(&ok.key), ok.peers.len());
|
||||
if ok.peers.is_empty() && self.num_connections != 0 {
|
||||
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||
results");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
KademliaEvent::GetRecordResult(res) => {
|
||||
let ev = match res {
|
||||
Ok(ok) => {
|
||||
let results = ok.records
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect();
|
||||
KademliaEvent::GetRecordResult(res) => {
|
||||
let ev = match res {
|
||||
Ok(ok) => {
|
||||
let results = ok.records
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect();
|
||||
|
||||
DiscoveryOut::ValueFound(results)
|
||||
DiscoveryOut::ValueFound(results)
|
||||
}
|
||||
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
|
||||
trace!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to get record: {:?}", e);
|
||||
DiscoveryOut::ValueNotFound(e.into_key())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to get record: {:?}", e);
|
||||
DiscoveryOut::ValueNotFound(e.into_key())
|
||||
}
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::PutRecordResult(res) => {
|
||||
let ev = match res {
|
||||
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
|
||||
Err(e) => {
|
||||
warn!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to put record: {:?}", e);
|
||||
DiscoveryOut::ValuePutFailed(e.into_key())
|
||||
}
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::RepublishRecordResult(res) => {
|
||||
match res {
|
||||
Ok(ok) => debug!(target: "sub-libp2p",
|
||||
"Libp2p => Record republished: {:?}",
|
||||
ok.key),
|
||||
Err(e) => warn!(target: "sub-libp2p",
|
||||
"Libp2p => Republishing of record {:?} failed with: {:?}",
|
||||
e.key(), e)
|
||||
}
|
||||
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
|
||||
trace!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to get record: {:?}", e);
|
||||
DiscoveryOut::ValueNotFound(e.into_key())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to get record: {:?}", e);
|
||||
DiscoveryOut::ValueNotFound(e.into_key())
|
||||
}
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::PutRecordResult(res) => {
|
||||
let ev = match res {
|
||||
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
|
||||
Err(e) => {
|
||||
warn!(target: "sub-libp2p",
|
||||
"Libp2p => Failed to put record: {:?}", e);
|
||||
DiscoveryOut::ValuePutFailed(e.into_key())
|
||||
}
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaEvent::RepublishRecordResult(res) => {
|
||||
match res {
|
||||
Ok(ok) => debug!(target: "sub-libp2p",
|
||||
"Libp2p => Record republished: {:?}",
|
||||
ok.key),
|
||||
Err(e) => warn!(target: "sub-libp2p",
|
||||
"Libp2p => Republishing of record {:?} failed with: {:?}",
|
||||
e.key(), e)
|
||||
}
|
||||
KademliaEvent::Discovered { .. } => {
|
||||
// We are not interested in these events at the moment.
|
||||
}
|
||||
// We never start any other type of query.
|
||||
e => {
|
||||
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
|
||||
}
|
||||
}
|
||||
KademliaEvent::Discovered { .. } => {
|
||||
// We are not interested in these events at the moment.
|
||||
}
|
||||
// We never start any other type of query.
|
||||
e => {
|
||||
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
|
||||
}
|
||||
},
|
||||
NetworkBehaviourAction::DialAddress { address } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
|
||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
|
||||
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||
NetworkBehaviourAction::DialAddress { address } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
|
||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler,
|
||||
event: (pid.clone(), event)
|
||||
}),
|
||||
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
||||
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,7 +664,7 @@ mod tests {
|
||||
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
|
||||
use libp2p::swarm::Swarm;
|
||||
use std::{collections::HashSet, task::Poll};
|
||||
use super::{DiscoveryBehaviour, DiscoveryOut};
|
||||
use super::{DiscoveryConfig, DiscoveryOut};
|
||||
|
||||
#[test]
|
||||
fn discovery_working() {
|
||||
@@ -540,13 +693,14 @@ mod tests {
|
||||
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
||||
});
|
||||
|
||||
let behaviour = futures::executor::block_on({
|
||||
let user_defined = user_defined.clone();
|
||||
let keypair_public = keypair.public();
|
||||
async move {
|
||||
DiscoveryBehaviour::new(keypair_public, user_defined, false, true, 50).await
|
||||
}
|
||||
});
|
||||
let behaviour = {
|
||||
let mut config = DiscoveryConfig::new(keypair.public());
|
||||
config.with_user_defined(user_defined.clone())
|
||||
.allow_private_ipv4(true)
|
||||
.discovery_limit(50);
|
||||
config.finish()
|
||||
};
|
||||
|
||||
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
|
||||
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
use crate::{
|
||||
behaviour::{Behaviour, BehaviourOut},
|
||||
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
|
||||
discovery::DiscoveryConfig,
|
||||
error::Error,
|
||||
network_state::{
|
||||
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
|
||||
@@ -310,24 +311,37 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
||||
peerset_handle.clone(),
|
||||
)
|
||||
};
|
||||
let mut behaviour = futures::executor::block_on(Behaviour::new(
|
||||
|
||||
let discovery_config = {
|
||||
let mut config = DiscoveryConfig::new(local_public.clone());
|
||||
config.with_user_defined(known_addresses);
|
||||
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
|
||||
config.add_protocol(params.protocol_id.clone());
|
||||
|
||||
match params.network_config.transport {
|
||||
TransportConfig::MemoryOnly => {
|
||||
config.with_mdns(false);
|
||||
config.allow_private_ipv4(false);
|
||||
}
|
||||
TransportConfig::Normal { enable_mdns, allow_private_ipv4, .. } => {
|
||||
config.with_mdns(enable_mdns);
|
||||
config.allow_private_ipv4(allow_private_ipv4);
|
||||
}
|
||||
}
|
||||
|
||||
config
|
||||
};
|
||||
|
||||
let mut behaviour = Behaviour::new(
|
||||
protocol,
|
||||
params.role,
|
||||
user_agent,
|
||||
local_public,
|
||||
known_addresses,
|
||||
match params.network_config.transport {
|
||||
TransportConfig::MemoryOnly => false,
|
||||
TransportConfig::Normal { enable_mdns, .. } => enable_mdns,
|
||||
},
|
||||
match params.network_config.transport {
|
||||
TransportConfig::MemoryOnly => false,
|
||||
TransportConfig::Normal { allow_private_ipv4, .. } => allow_private_ipv4,
|
||||
},
|
||||
u64::from(params.network_config.out_peers) + 15,
|
||||
block_requests,
|
||||
light_client_handler
|
||||
));
|
||||
light_client_handler,
|
||||
discovery_config
|
||||
);
|
||||
|
||||
for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols {
|
||||
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user