mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 15:07:59 +00:00
Use a Kademlia instance per ProtocolId. (#5045)
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Role,
|
config::Role,
|
||||||
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut,
|
debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
|
||||||
Event, ObservedRole, DhtEvent, ExHashT,
|
Event, ObservedRole, DhtEvent, ExHashT,
|
||||||
};
|
};
|
||||||
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
|
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
|
||||||
@@ -67,28 +67,19 @@ pub enum BehaviourOut<B: BlockT> {
|
|||||||
|
|
||||||
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
|
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
|
||||||
/// Builds a new `Behaviour`.
|
/// Builds a new `Behaviour`.
|
||||||
pub async fn new(
|
pub fn new(
|
||||||
substrate: Protocol<B, H>,
|
substrate: Protocol<B, H>,
|
||||||
role: Role,
|
role: Role,
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
local_public_key: PublicKey,
|
local_public_key: PublicKey,
|
||||||
known_addresses: Vec<(PeerId, Multiaddr)>,
|
|
||||||
enable_mdns: bool,
|
|
||||||
allow_private_ipv4: bool,
|
|
||||||
discovery_only_if_under_num: u64,
|
|
||||||
block_requests: protocol::BlockRequests<B>,
|
block_requests: protocol::BlockRequests<B>,
|
||||||
light_client_handler: protocol::LightClientHandler<B>,
|
light_client_handler: protocol::LightClientHandler<B>,
|
||||||
|
disco_config: DiscoveryConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Behaviour {
|
Behaviour {
|
||||||
substrate,
|
substrate,
|
||||||
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
|
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
|
||||||
discovery: DiscoveryBehaviour::new(
|
discovery: disco_config.finish(),
|
||||||
local_public_key,
|
|
||||||
known_addresses,
|
|
||||||
enable_mdns,
|
|
||||||
allow_private_ipv4,
|
|
||||||
discovery_only_if_under_num,
|
|
||||||
).await,
|
|
||||||
block_requests,
|
block_requests,
|
||||||
light_client_handler,
|
light_client_handler,
|
||||||
events: Vec::new(),
|
events: Vec::new(),
|
||||||
|
|||||||
@@ -45,30 +45,162 @@
|
|||||||
//! of a node's address, you must call `add_self_reported_address`.
|
//! of a node's address, you must call `add_self_reported_address`.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
|
use crate::config::ProtocolId;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
|
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
|
||||||
use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
|
||||||
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record};
|
use libp2p::swarm::protocols_handler::multi::MultiHandler;
|
||||||
|
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
|
||||||
use libp2p::kad::GetClosestPeersError;
|
use libp2p::kad::GetClosestPeersError;
|
||||||
|
use libp2p::kad::handler::KademliaHandler;
|
||||||
|
use libp2p::kad::QueryId;
|
||||||
use libp2p::kad::record::{self, store::MemoryStore};
|
use libp2p::kad::record::{self, store::MemoryStore};
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
use libp2p::{swarm::toggle::Toggle};
|
use libp2p::swarm::toggle::Toggle;
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
use libp2p::mdns::{Mdns, MdnsEvent};
|
use libp2p::mdns::{Mdns, MdnsEvent};
|
||||||
use libp2p::multiaddr::Protocol;
|
use libp2p::multiaddr::Protocol;
|
||||||
use log::{debug, info, trace, warn, error};
|
use log::{debug, info, trace, warn, error};
|
||||||
use std::{cmp, collections::VecDeque, io, time::Duration};
|
use std::{cmp, collections::{HashMap, HashSet, VecDeque}, io, time::Duration};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use sp_core::hexdisplay::HexDisplay;
|
use sp_core::hexdisplay::HexDisplay;
|
||||||
|
|
||||||
|
/// `DiscoveryBehaviour` configuration.
|
||||||
|
pub struct DiscoveryConfig {
|
||||||
|
local_peer_id: PeerId,
|
||||||
|
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||||
|
allow_private_ipv4: bool,
|
||||||
|
discovery_only_if_under_num: u64,
|
||||||
|
enable_mdns: bool,
|
||||||
|
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DiscoveryConfig {
|
||||||
|
/// Crate a default configuration with the given public key.
|
||||||
|
pub fn new(local_public_key: PublicKey) -> Self {
|
||||||
|
let mut this = DiscoveryConfig {
|
||||||
|
local_peer_id: local_public_key.into_peer_id(),
|
||||||
|
user_defined: Vec::new(),
|
||||||
|
allow_private_ipv4: true,
|
||||||
|
discovery_only_if_under_num: std::u64::MAX,
|
||||||
|
enable_mdns: false,
|
||||||
|
kademlias: HashMap::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Temporary hack to retain backwards compatibility.
|
||||||
|
// We should eventually remove the special handling of DEFAULT_PROTO_NAME.
|
||||||
|
let proto_id = ProtocolId::from(libp2p::kad::protocol::DEFAULT_PROTO_NAME);
|
||||||
|
let proto_name = Vec::from(proto_id.as_bytes());
|
||||||
|
this.add_kademlia(proto_id, proto_name);
|
||||||
|
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the number of active connections at which we pause discovery.
|
||||||
|
pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
|
||||||
|
self.discovery_only_if_under_num = limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
|
||||||
|
pub fn with_user_defined<I>(&mut self, user_defined: I) -> &mut Self
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = (PeerId, Multiaddr)>
|
||||||
|
{
|
||||||
|
for (peer_id, addr) in user_defined {
|
||||||
|
for kad in self.kademlias.values_mut() {
|
||||||
|
kad.add_address(&peer_id, addr.clone())
|
||||||
|
}
|
||||||
|
self.user_defined.push((peer_id, addr))
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Should private IPv4 addresses be reported?
|
||||||
|
pub fn allow_private_ipv4(&mut self, value: bool) -> &mut Self {
|
||||||
|
self.allow_private_ipv4 = value;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Should MDNS discovery be supported?
|
||||||
|
pub fn with_mdns(&mut self, value: bool) -> &mut Self {
|
||||||
|
if value && cfg!(target_os = "unknown") {
|
||||||
|
log::warn!(target: "sub-libp2p", "mDNS is not available on this platform")
|
||||||
|
}
|
||||||
|
self.enable_mdns = value;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add discovery via Kademlia for the given protocol.
|
||||||
|
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
|
||||||
|
// NB: If this protocol name derivation is changed, check if
|
||||||
|
// `DiscoveryBehaviour::new_handler` is still correct.
|
||||||
|
let proto_name = {
|
||||||
|
let mut v = vec![b'/'];
|
||||||
|
v.extend_from_slice(p.as_bytes());
|
||||||
|
v.extend_from_slice(b"/kad");
|
||||||
|
v
|
||||||
|
};
|
||||||
|
|
||||||
|
self.add_kademlia(p, proto_name);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_kademlia(&mut self, id: ProtocolId, proto_name: Vec<u8>) {
|
||||||
|
if self.kademlias.contains_key(&id) {
|
||||||
|
warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id);
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut config = KademliaConfig::default();
|
||||||
|
config.set_protocol_name(proto_name);
|
||||||
|
|
||||||
|
let store = MemoryStore::new(self.local_peer_id.clone());
|
||||||
|
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
|
||||||
|
|
||||||
|
for (peer_id, addr) in &self.user_defined {
|
||||||
|
kad.add_address(peer_id, addr.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.kademlias.insert(id, kad);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a `DiscoveryBehaviour` from this config.
|
||||||
|
pub fn finish(self) -> DiscoveryBehaviour {
|
||||||
|
DiscoveryBehaviour {
|
||||||
|
user_defined: self.user_defined,
|
||||||
|
kademlias: self.kademlias,
|
||||||
|
next_kad_random_query: Delay::new(Duration::new(0, 0)),
|
||||||
|
duration_to_next_kad: Duration::from_secs(1),
|
||||||
|
discoveries: VecDeque::new(),
|
||||||
|
local_peer_id: self.local_peer_id,
|
||||||
|
num_connections: 0,
|
||||||
|
allow_private_ipv4: self.allow_private_ipv4,
|
||||||
|
discovery_only_if_under_num: self.discovery_only_if_under_num,
|
||||||
|
#[cfg(not(target_os = "unknown"))]
|
||||||
|
mdns: if self.enable_mdns {
|
||||||
|
match Mdns::new() {
|
||||||
|
Ok(mdns) => Some(mdns).into(),
|
||||||
|
Err(err) => {
|
||||||
|
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
|
||||||
|
None.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None.into()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
|
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
|
||||||
pub struct DiscoveryBehaviour {
|
pub struct DiscoveryBehaviour {
|
||||||
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
|
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
|
||||||
/// reserved nodes.
|
/// reserved nodes.
|
||||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
user_defined: Vec<(PeerId, Multiaddr)>,
|
||||||
/// Kademlia requests and answers.
|
/// Kademlia requests and answers.
|
||||||
kademlia: Kademlia<MemoryStore>,
|
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
|
||||||
/// Discovers nodes on the local network.
|
/// Discovers nodes on the local network.
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
mdns: Toggle<Mdns>,
|
mdns: Toggle<Mdns>,
|
||||||
@@ -90,56 +222,13 @@ pub struct DiscoveryBehaviour {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DiscoveryBehaviour {
|
impl DiscoveryBehaviour {
|
||||||
/// Builds a new `DiscoveryBehaviour`.
|
|
||||||
///
|
|
||||||
/// `user_defined` is a list of known address for nodes that never expire.
|
|
||||||
pub async fn new(
|
|
||||||
local_public_key: PublicKey,
|
|
||||||
user_defined: Vec<(PeerId, Multiaddr)>,
|
|
||||||
enable_mdns: bool,
|
|
||||||
allow_private_ipv4: bool,
|
|
||||||
discovery_only_if_under_num: u64,
|
|
||||||
) -> Self {
|
|
||||||
if enable_mdns {
|
|
||||||
#[cfg(target_os = "unknown")]
|
|
||||||
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
|
|
||||||
}
|
|
||||||
|
|
||||||
let local_id = local_public_key.clone().into_peer_id();
|
|
||||||
let store = MemoryStore::new(local_id.clone());
|
|
||||||
let mut kademlia = Kademlia::new(local_id.clone(), store);
|
|
||||||
for (peer_id, addr) in &user_defined {
|
|
||||||
kademlia.add_address(peer_id, addr.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
DiscoveryBehaviour {
|
|
||||||
user_defined,
|
|
||||||
kademlia,
|
|
||||||
next_kad_random_query: Delay::new(Duration::new(0, 0)),
|
|
||||||
duration_to_next_kad: Duration::from_secs(1),
|
|
||||||
discoveries: VecDeque::new(),
|
|
||||||
local_peer_id: local_public_key.into_peer_id(),
|
|
||||||
num_connections: 0,
|
|
||||||
allow_private_ipv4,
|
|
||||||
discovery_only_if_under_num,
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
|
||||||
mdns: if enable_mdns {
|
|
||||||
match Mdns::new() {
|
|
||||||
Ok(mdns) => Some(mdns).into(),
|
|
||||||
Err(err) => {
|
|
||||||
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
|
|
||||||
None.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None.into()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the list of nodes that we know exist in the network.
|
/// Returns the list of nodes that we know exist in the network.
|
||||||
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
|
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
|
||||||
self.kademlia.kbuckets_entries()
|
let mut set = HashSet::new();
|
||||||
|
for p in self.kademlias.values_mut().map(|k| k.kbuckets_entries()).flatten() {
|
||||||
|
set.insert(p);
|
||||||
|
}
|
||||||
|
set.into_iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a hard-coded address for the given peer, that never expires.
|
/// Adds a hard-coded address for the given peer, that never expires.
|
||||||
@@ -149,7 +238,9 @@ impl DiscoveryBehaviour {
|
|||||||
/// If we didn't know this address before, also generates a `Discovered` event.
|
/// If we didn't know this address before, also generates a `Discovered` event.
|
||||||
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
|
||||||
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
|
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
|
||||||
self.kademlia.add_address(&peer_id, addr.clone());
|
for k in self.kademlias.values_mut() {
|
||||||
|
k.add_address(&peer_id, addr.clone())
|
||||||
|
}
|
||||||
self.discoveries.push_back(peer_id.clone());
|
self.discoveries.push_back(peer_id.clone());
|
||||||
self.user_defined.push((peer_id, addr));
|
self.user_defined.push((peer_id, addr));
|
||||||
}
|
}
|
||||||
@@ -160,14 +251,18 @@ impl DiscoveryBehaviour {
|
|||||||
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
|
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
|
||||||
/// not properly work.
|
/// not properly work.
|
||||||
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
|
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
|
||||||
self.kademlia.add_address(peer_id, addr);
|
for k in self.kademlias.values_mut() {
|
||||||
|
k.add_address(peer_id, addr.clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start fetching a record from the DHT.
|
/// Start fetching a record from the DHT.
|
||||||
///
|
///
|
||||||
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
|
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
|
||||||
pub fn get_value(&mut self, key: &record::Key) {
|
pub fn get_value(&mut self, key: &record::Key) {
|
||||||
self.kademlia.get_record(key, Quorum::One)
|
for k in self.kademlias.values_mut() {
|
||||||
|
k.get_record(key, Quorum::One)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start putting a record into the DHT. Other nodes can later fetch that value with
|
/// Start putting a record into the DHT. Other nodes can later fetch that value with
|
||||||
@@ -175,12 +270,14 @@ impl DiscoveryBehaviour {
|
|||||||
///
|
///
|
||||||
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
|
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
|
||||||
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
|
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
|
||||||
self.kademlia.put_record(Record::new(key, value), Quorum::All);
|
for k in self.kademlias.values_mut() {
|
||||||
|
k.put_record(Record::new(key.clone(), value.clone()), Quorum::All)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of nodes that are in the Kademlia k-buckets.
|
/// Returns the number of nodes that are in the Kademlia k-buckets.
|
||||||
pub fn num_kbuckets_entries(&mut self) -> usize {
|
pub fn num_kbuckets_entries(&mut self) -> usize {
|
||||||
self.kademlia.kbuckets_entries().count()
|
self.known_peers().count()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,11 +312,19 @@ pub enum DiscoveryOut {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkBehaviour for DiscoveryBehaviour {
|
impl NetworkBehaviour for DiscoveryBehaviour {
|
||||||
type ProtocolsHandler = <Kademlia<MemoryStore> as NetworkBehaviour>::ProtocolsHandler;
|
type ProtocolsHandler = MultiHandler<ProtocolId, KademliaHandler<QueryId>>;
|
||||||
type OutEvent = DiscoveryOut;
|
type OutEvent = DiscoveryOut;
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
NetworkBehaviour::new_handler(&mut self.kademlia)
|
let iter = self.kademlias.iter_mut()
|
||||||
|
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)));
|
||||||
|
|
||||||
|
MultiHandler::try_from_iter(iter)
|
||||||
|
.expect("There can be at most one handler per `ProtocolId` and \
|
||||||
|
protocol names contain the `ProtocolId` so no two protocol \
|
||||||
|
names in `self.kademlias` can be equal which is the only error \
|
||||||
|
`try_from_iter` can return, therefore this call is guaranteed \
|
||||||
|
to succeed; qed")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
@@ -228,7 +333,11 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id);
|
let mut list_to_filter = Vec::new();
|
||||||
|
for k in self.kademlias.values_mut() {
|
||||||
|
list_to_filter.extend(k.addresses_of_peer(peer_id))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
|
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
|
||||||
|
|
||||||
@@ -248,13 +357,23 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
|
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
|
||||||
|
|
||||||
if list.is_empty() {
|
if list.is_empty() {
|
||||||
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) {
|
let mut has_entry = false;
|
||||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \
|
for k in self.kademlias.values_mut() {
|
||||||
and no address was found", peer_id);
|
if k.kbuckets_entries().any(|p| p == peer_id) {
|
||||||
|
has_entry = true;
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if has_entry {
|
||||||
|
debug!(target: "sub-libp2p",
|
||||||
|
"Requested dialing to {:?} (peer in k-buckets), and no address was found",
|
||||||
|
peer_id);
|
||||||
} else {
|
} else {
|
||||||
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \
|
debug!(target: "sub-libp2p",
|
||||||
and no address was found", peer_id);
|
"Requested dialing to {:?} (peer not in k-buckets), and no address was found",
|
||||||
|
peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
list
|
list
|
||||||
@@ -262,20 +381,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
|
|
||||||
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||||
self.num_connections += 1;
|
self.num_connections += 1;
|
||||||
NetworkBehaviour::inject_connection_established(&mut self.kademlia, peer_id, conn, endpoint)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||||
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_connected(k, peer_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||||
self.num_connections -= 1;
|
self.num_connections -= 1;
|
||||||
NetworkBehaviour::inject_connection_closed(&mut self.kademlia, peer_id, conn, endpoint)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||||
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_disconnected(k, peer_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_addr_reach_failure(
|
fn inject_addr_reach_failure(
|
||||||
@@ -284,45 +411,65 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
addr: &Multiaddr,
|
addr: &Multiaddr,
|
||||||
error: &dyn std::error::Error
|
error: &dyn std::error::Error
|
||||||
) {
|
) {
|
||||||
NetworkBehaviour::inject_addr_reach_failure(&mut self.kademlia, peer_id, addr, error)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_addr_reach_failure(k, peer_id, addr, error)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_event(
|
fn inject_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
(pid, event): <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||||
) {
|
) {
|
||||||
NetworkBehaviour::inject_event(&mut self.kademlia, peer_id, connection, event)
|
if let Some(kad) = self.kademlias.get_mut(&pid) {
|
||||||
|
return kad.inject_event(peer_id, connection, event)
|
||||||
|
}
|
||||||
|
log::error!(target: "sub-libp2p",
|
||||||
|
"inject_node_event: no kademlia instance registered for protocol {:?}",
|
||||||
|
pid)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||||
let new_addr = addr.clone()
|
let new_addr = addr.clone()
|
||||||
.with(Protocol::P2p(self.local_peer_id.clone().into()));
|
.with(Protocol::P2p(self.local_peer_id.clone().into()));
|
||||||
info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr);
|
info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr);
|
||||||
NetworkBehaviour::inject_new_external_addr(&mut self.kademlia, addr)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_new_external_addr(k, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
info!(target: "sub-libp2p", "No longer listening on {}", addr);
|
info!(target: "sub-libp2p", "No longer listening on {}", addr);
|
||||||
NetworkBehaviour::inject_expired_listen_addr(&mut self.kademlia, addr)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_expired_listen_addr(k, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||||
NetworkBehaviour::inject_dial_failure(&mut self.kademlia, peer_id)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_dial_failure(k, peer_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
NetworkBehaviour::inject_new_listen_addr(&mut self.kademlia, addr)
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_new_listen_addr(k, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
||||||
error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err);
|
error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err);
|
||||||
NetworkBehaviour::inject_listener_error(&mut self.kademlia, id, err);
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_listener_error(k, id, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
|
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
|
||||||
NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id, reason);
|
error!(target: "sub-libp2p", "Libp2p listener {:?} closed", id);
|
||||||
|
for k in self.kademlias.values_mut() {
|
||||||
|
NetworkBehaviour::inject_listener_closed(k, id, reason)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
fn poll(
|
||||||
@@ -348,10 +495,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
debug!(target: "sub-libp2p",
|
debug!(target: "sub-libp2p",
|
||||||
"Libp2p <= Starting random Kademlia request for {:?}",
|
"Libp2p <= Starting random Kademlia request for {:?}",
|
||||||
random_peer_id);
|
random_peer_id);
|
||||||
|
for k in self.kademlias.values_mut() {
|
||||||
self.kademlia.get_closest_peers(random_peer_id);
|
k.get_closest_peers(random_peer_id.clone())
|
||||||
|
}
|
||||||
true
|
true
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
target: "sub-libp2p",
|
target: "sub-libp2p",
|
||||||
@@ -373,96 +520,102 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll Kademlia.
|
// Poll Kademlias.
|
||||||
while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
|
for (pid, kademlia) in &mut self.kademlias {
|
||||||
match ev {
|
while let Poll::Ready(ev) = kademlia.poll(cx, params) {
|
||||||
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
|
match ev {
|
||||||
KademliaEvent::UnroutablePeer { peer, .. } => {
|
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
|
||||||
let ev = DiscoveryOut::UnroutablePeer(peer);
|
KademliaEvent::UnroutablePeer { peer, .. } => {
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
let ev = DiscoveryOut::UnroutablePeer(peer);
|
||||||
}
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||||
KademliaEvent::RoutingUpdated { peer, .. } => {
|
}
|
||||||
let ev = DiscoveryOut::Discovered(peer);
|
KademliaEvent::RoutingUpdated { peer, .. } => {
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
let ev = DiscoveryOut::Discovered(peer);
|
||||||
}
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||||
KademliaEvent::GetClosestPeersResult(res) => {
|
}
|
||||||
match res {
|
KademliaEvent::GetClosestPeersResult(res) => {
|
||||||
Err(GetClosestPeersError::Timeout { key, peers }) => {
|
match res {
|
||||||
debug!(target: "sub-libp2p",
|
Err(GetClosestPeersError::Timeout { key, peers }) => {
|
||||||
"Libp2p => Query for {:?} timed out with {} results",
|
debug!(target: "sub-libp2p",
|
||||||
HexDisplay::from(&key), peers.len());
|
"Libp2p => Query for {:?} timed out with {} results",
|
||||||
},
|
HexDisplay::from(&key), peers.len());
|
||||||
Ok(ok) => {
|
},
|
||||||
trace!(target: "sub-libp2p",
|
Ok(ok) => {
|
||||||
"Libp2p => Query for {:?} yielded {:?} results",
|
trace!(target: "sub-libp2p",
|
||||||
HexDisplay::from(&ok.key), ok.peers.len());
|
"Libp2p => Query for {:?} yielded {:?} results",
|
||||||
if ok.peers.is_empty() && self.num_connections != 0 {
|
HexDisplay::from(&ok.key), ok.peers.len());
|
||||||
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
if ok.peers.is_empty() && self.num_connections != 0 {
|
||||||
results");
|
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||||
|
results");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
KademliaEvent::GetRecordResult(res) => {
|
||||||
KademliaEvent::GetRecordResult(res) => {
|
let ev = match res {
|
||||||
let ev = match res {
|
Ok(ok) => {
|
||||||
Ok(ok) => {
|
let results = ok.records
|
||||||
let results = ok.records
|
.into_iter()
|
||||||
.into_iter()
|
.map(|r| (r.key, r.value))
|
||||||
.map(|r| (r.key, r.value))
|
.collect();
|
||||||
.collect();
|
|
||||||
|
|
||||||
DiscoveryOut::ValueFound(results)
|
DiscoveryOut::ValueFound(results)
|
||||||
|
}
|
||||||
|
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
|
||||||
|
trace!(target: "sub-libp2p",
|
||||||
|
"Libp2p => Failed to get record: {:?}", e);
|
||||||
|
DiscoveryOut::ValueNotFound(e.into_key())
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(target: "sub-libp2p",
|
||||||
|
"Libp2p => Failed to get record: {:?}", e);
|
||||||
|
DiscoveryOut::ValueNotFound(e.into_key())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||||
|
}
|
||||||
|
KademliaEvent::PutRecordResult(res) => {
|
||||||
|
let ev = match res {
|
||||||
|
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
|
||||||
|
Err(e) => {
|
||||||
|
warn!(target: "sub-libp2p",
|
||||||
|
"Libp2p => Failed to put record: {:?}", e);
|
||||||
|
DiscoveryOut::ValuePutFailed(e.into_key())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||||
|
}
|
||||||
|
KademliaEvent::RepublishRecordResult(res) => {
|
||||||
|
match res {
|
||||||
|
Ok(ok) => debug!(target: "sub-libp2p",
|
||||||
|
"Libp2p => Record republished: {:?}",
|
||||||
|
ok.key),
|
||||||
|
Err(e) => warn!(target: "sub-libp2p",
|
||||||
|
"Libp2p => Republishing of record {:?} failed with: {:?}",
|
||||||
|
e.key(), e)
|
||||||
}
|
}
|
||||||
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
|
}
|
||||||
trace!(target: "sub-libp2p",
|
KademliaEvent::Discovered { .. } => {
|
||||||
"Libp2p => Failed to get record: {:?}", e);
|
// We are not interested in these events at the moment.
|
||||||
DiscoveryOut::ValueNotFound(e.into_key())
|
}
|
||||||
}
|
// We never start any other type of query.
|
||||||
Err(e) => {
|
e => {
|
||||||
warn!(target: "sub-libp2p",
|
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
|
||||||
"Libp2p => Failed to get record: {:?}", e);
|
|
||||||
DiscoveryOut::ValueNotFound(e.into_key())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
|
||||||
}
|
|
||||||
KademliaEvent::PutRecordResult(res) => {
|
|
||||||
let ev = match res {
|
|
||||||
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
|
|
||||||
Err(e) => {
|
|
||||||
warn!(target: "sub-libp2p",
|
|
||||||
"Libp2p => Failed to put record: {:?}", e);
|
|
||||||
DiscoveryOut::ValuePutFailed(e.into_key())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
|
||||||
}
|
|
||||||
KademliaEvent::RepublishRecordResult(res) => {
|
|
||||||
match res {
|
|
||||||
Ok(ok) => debug!(target: "sub-libp2p",
|
|
||||||
"Libp2p => Record republished: {:?}",
|
|
||||||
ok.key),
|
|
||||||
Err(e) => warn!(target: "sub-libp2p",
|
|
||||||
"Libp2p => Republishing of record {:?} failed with: {:?}",
|
|
||||||
e.key(), e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
KademliaEvent::Discovered { .. } => {
|
NetworkBehaviourAction::DialAddress { address } =>
|
||||||
// We are not interested in these events at the moment.
|
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||||
}
|
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||||
// We never start any other type of query.
|
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
|
||||||
e => {
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||||
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
|
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||||
}
|
peer_id,
|
||||||
},
|
handler,
|
||||||
NetworkBehaviourAction::DialAddress { address } =>
|
event: (pid.clone(), event)
|
||||||
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
|
}),
|
||||||
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
||||||
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
|
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||||
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
}
|
||||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
|
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -511,7 +664,7 @@ mod tests {
|
|||||||
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
|
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
|
||||||
use libp2p::swarm::Swarm;
|
use libp2p::swarm::Swarm;
|
||||||
use std::{collections::HashSet, task::Poll};
|
use std::{collections::HashSet, task::Poll};
|
||||||
use super::{DiscoveryBehaviour, DiscoveryOut};
|
use super::{DiscoveryConfig, DiscoveryOut};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn discovery_working() {
|
fn discovery_working() {
|
||||||
@@ -540,13 +693,14 @@ mod tests {
|
|||||||
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
|
||||||
});
|
});
|
||||||
|
|
||||||
let behaviour = futures::executor::block_on({
|
let behaviour = {
|
||||||
let user_defined = user_defined.clone();
|
let mut config = DiscoveryConfig::new(keypair.public());
|
||||||
let keypair_public = keypair.public();
|
config.with_user_defined(user_defined.clone())
|
||||||
async move {
|
.allow_private_ipv4(true)
|
||||||
DiscoveryBehaviour::new(keypair_public, user_defined, false, true, 50).await
|
.discovery_limit(50);
|
||||||
}
|
config.finish()
|
||||||
});
|
};
|
||||||
|
|
||||||
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
|
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
|
||||||
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
|
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
behaviour::{Behaviour, BehaviourOut},
|
behaviour::{Behaviour, BehaviourOut},
|
||||||
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
|
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
|
||||||
|
discovery::DiscoveryConfig,
|
||||||
error::Error,
|
error::Error,
|
||||||
network_state::{
|
network_state::{
|
||||||
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
|
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
|
||||||
@@ -310,24 +311,37 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
|
|||||||
peerset_handle.clone(),
|
peerset_handle.clone(),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
let mut behaviour = futures::executor::block_on(Behaviour::new(
|
|
||||||
|
let discovery_config = {
|
||||||
|
let mut config = DiscoveryConfig::new(local_public.clone());
|
||||||
|
config.with_user_defined(known_addresses);
|
||||||
|
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
|
||||||
|
config.add_protocol(params.protocol_id.clone());
|
||||||
|
|
||||||
|
match params.network_config.transport {
|
||||||
|
TransportConfig::MemoryOnly => {
|
||||||
|
config.with_mdns(false);
|
||||||
|
config.allow_private_ipv4(false);
|
||||||
|
}
|
||||||
|
TransportConfig::Normal { enable_mdns, allow_private_ipv4, .. } => {
|
||||||
|
config.with_mdns(enable_mdns);
|
||||||
|
config.allow_private_ipv4(allow_private_ipv4);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
config
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut behaviour = Behaviour::new(
|
||||||
protocol,
|
protocol,
|
||||||
params.role,
|
params.role,
|
||||||
user_agent,
|
user_agent,
|
||||||
local_public,
|
local_public,
|
||||||
known_addresses,
|
|
||||||
match params.network_config.transport {
|
|
||||||
TransportConfig::MemoryOnly => false,
|
|
||||||
TransportConfig::Normal { enable_mdns, .. } => enable_mdns,
|
|
||||||
},
|
|
||||||
match params.network_config.transport {
|
|
||||||
TransportConfig::MemoryOnly => false,
|
|
||||||
TransportConfig::Normal { allow_private_ipv4, .. } => allow_private_ipv4,
|
|
||||||
},
|
|
||||||
u64::from(params.network_config.out_peers) + 15,
|
|
||||||
block_requests,
|
block_requests,
|
||||||
light_client_handler
|
light_client_handler,
|
||||||
));
|
discovery_config
|
||||||
|
);
|
||||||
|
|
||||||
for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols {
|
for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols {
|
||||||
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
|
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user