Remove multiple DHTs support from Discovery (#12524)

This commit is contained in:
Dmitrii Markin
2022-10-20 19:30:01 +03:00
committed by GitHub
parent 42215038a3
commit 3a10019a10
4 changed files with 298 additions and 474 deletions
+7 -14
View File
@@ -34,7 +34,6 @@ use libp2p::{
use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin};
use sc_network_common::{
config::ProtocolId,
protocol::{
event::DhtEvent,
role::{ObservedRole, Roles},
@@ -79,7 +78,7 @@ pub enum BehaviourOut<B: BlockT> {
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted(Vec<ProtocolId>),
RandomKademliaStarted,
/// We have received a request from a peer and answered it.
///
@@ -267,25 +266,20 @@ where
self.discovery.add_known_address(peer_id, addr)
}
/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
/// Returns the number of nodes in each Kademlia kbucket.
///
/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
/// of their lower bound.
pub fn num_entries_per_kbucket(
&mut self,
) -> impl ExactSizeIterator<Item = (&ProtocolId, Vec<(u32, usize)>)> {
/// Identifies kbuckets by the base 2 logarithm of their lower bound.
pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
self.discovery.num_entries_per_kbucket()
}
/// Returns the number of records in the Kademlia record stores.
pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
pub fn num_kademlia_records(&mut self) -> Option<usize> {
self.discovery.num_kademlia_records()
}
/// Returns the total size in bytes of all the records in the Kademlia record stores.
pub fn kademlia_records_total_size(
&mut self,
) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
self.discovery.kademlia_records_total_size()
}
@@ -438,8 +432,7 @@ impl<B: BlockT> From<DiscoveryOut> for BehaviourOut<B> {
BehaviourOut::Dht(DhtEvent::ValuePut(key), duration),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration),
DiscoveryOut::RandomKademliaStarted(protocols) =>
BehaviourOut::RandomKademliaStarted(protocols),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
}
+269 -415
View File
@@ -66,11 +66,12 @@ use libp2p::{
mdns::{Mdns, MdnsConfig, MdnsEvent},
multiaddr::Protocol,
swarm::{
handler::multi::IntoMultiHandler, ConnectionHandler, DialError, IntoConnectionHandler,
NetworkBehaviour, NetworkBehaviourAction, PollParameters,
behaviour::toggle::{Toggle, ToggleIntoConnectionHandler},
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
};
use log::{debug, error, info, trace, warn};
use log::{debug, info, trace, warn};
use sc_network_common::{config::ProtocolId, utils::LruHashSet};
use sp_core::hexdisplay::HexDisplay;
use std::{
@@ -89,8 +90,8 @@ const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
/// `DiscoveryBehaviour` configuration.
///
/// Note: In order to discover nodes or load and store values via Kademlia one has to add at least
/// one protocol via [`DiscoveryConfig::add_protocol`].
/// Note: In order to discover nodes or load and store values via Kademlia one has to add
/// Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
pub struct DiscoveryConfig {
local_peer_id: PeerId,
permanent_addresses: Vec<(PeerId, Multiaddr)>,
@@ -100,7 +101,7 @@ pub struct DiscoveryConfig {
discovery_only_if_under_num: u64,
enable_mdns: bool,
kademlia_disjoint_query_paths: bool,
protocol_ids: HashSet<ProtocolId>,
kademlia_protocol_id: Option<ProtocolId>,
}
impl DiscoveryConfig {
@@ -115,7 +116,7 @@ impl DiscoveryConfig {
discovery_only_if_under_num: std::u64::MAX,
enable_mdns: false,
kademlia_disjoint_query_paths: false,
protocol_ids: HashSet::new(),
kademlia_protocol_id: None,
}
}
@@ -160,13 +161,8 @@ impl DiscoveryConfig {
}
/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self {
if self.protocol_ids.contains(&id) {
warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id);
return self
}
self.protocol_ids.insert(id);
pub fn with_kademlia(&mut self, id: ProtocolId) -> &mut Self {
self.kademlia_protocol_id = Some(id);
self
}
@@ -189,37 +185,34 @@ impl DiscoveryConfig {
discovery_only_if_under_num,
enable_mdns,
kademlia_disjoint_query_paths,
protocol_ids,
kademlia_protocol_id,
} = self;
let kademlias = protocol_ids
.into_iter()
.map(|protocol_id| {
let proto_name = protocol_name_from_protocol_id(&protocol_id);
let kademlia = kademlia_protocol_id.map(|protocol_id| {
let proto_name = protocol_name_from_protocol_id(&protocol_id);
let mut config = KademliaConfig::default();
config.set_protocol_names(std::iter::once(proto_name.into()).collect());
// By default Kademlia attempts to insert all peers into its routing table once a
// dialing attempt succeeds. In order to control which peer is added, disable the
// auto-insertion and instead add peers manually.
config.set_kbucket_inserts(KademliaBucketInserts::Manual);
config.disjoint_query_paths(kademlia_disjoint_query_paths);
let mut config = KademliaConfig::default();
config.set_protocol_names(std::iter::once(proto_name.into()).collect());
// By default Kademlia attempts to insert all peers into its routing table once a
// dialing attempt succeeds. In order to control which peer is added, disable the
// auto-insertion and instead add peers manually.
config.set_kbucket_inserts(KademliaBucketInserts::Manual);
config.disjoint_query_paths(kademlia_disjoint_query_paths);
let store = MemoryStore::new(local_peer_id);
let mut kad = Kademlia::with_config(local_peer_id, store, config);
let store = MemoryStore::new(local_peer_id);
let mut kad = Kademlia::with_config(local_peer_id, store, config);
for (peer_id, addr) in &permanent_addresses {
kad.add_address(peer_id, addr.clone());
}
for (peer_id, addr) in &permanent_addresses {
kad.add_address(peer_id, addr.clone());
}
(protocol_id, kad)
})
.collect();
kad
});
DiscoveryBehaviour {
permanent_addresses,
ephemeral_addresses: HashMap::new(),
kademlias,
kademlia: Toggle::from(kademlia),
next_kad_random_query: if dht_random_walk {
Some(Delay::new(Duration::new(0, 0)))
} else {
@@ -259,8 +252,9 @@ pub struct DiscoveryBehaviour {
/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
/// removed.
ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
/// Kademlia requests and answers.
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
/// it's always enabled in `NetworkWorker::new()`.
kademlia: Toggle<Kademlia<MemoryStore>>,
/// Discovers nodes on the local network.
mdns: Option<Mdns>,
/// Stream that fires when we need to perform the next random Kademlia query. `None` if
@@ -289,7 +283,7 @@ impl DiscoveryBehaviour {
/// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> HashSet<PeerId> {
let mut peers = HashSet::new();
for k in self.kademlias.values_mut() {
if let Some(k) = self.kademlia.as_mut() {
for b in k.kbuckets() {
for e in b.iter() {
if !peers.contains(e.node.key.preimage()) {
@@ -309,7 +303,7 @@ impl DiscoveryBehaviour {
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
if !addrs_list.iter().any(|a| *a == addr) {
for k in self.kademlias.values_mut() {
if let Some(k) = self.kademlia.as_mut() {
k.add_address(&peer_id, addr.clone());
}
@@ -318,8 +312,8 @@ impl DiscoveryBehaviour {
}
}
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
/// Add a self-reported address of a remote peer to the k-buckets of the DHT
/// if it has compatible `supported_protocols`.
///
/// **Note**: It is important that you call this method. The discovery mechanism will not
/// automatically add connecting peers to the Kademlia k-buckets.
@@ -329,13 +323,15 @@ impl DiscoveryBehaviour {
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) {
trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id);
return
}
if let Some(kademlia) = self.kademlia.as_mut() {
if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
trace!(
target: "sub-libp2p",
"Ignoring self-reported non-global address {} from {}.", addr, peer_id
);
return
}
let mut added = false;
for kademlia in self.kademlias.values_mut() {
if let Some(matching_protocol) = supported_protocols
.iter()
.find(|p| kademlia.protocol_names().iter().any(|k| k.as_ref() == p.as_ref()))
@@ -346,24 +342,21 @@ impl DiscoveryBehaviour {
addr, peer_id, String::from_utf8_lossy(matching_protocol.as_ref()),
);
kademlia.add_address(peer_id, addr.clone());
added = true;
} else {
trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of the \
Kademlia DHT supported by the local node.", addr, peer_id,
);
}
}
if !added {
trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of any \
Kademlia DHTs supported by the local node.", addr, peer_id,
);
}
}
/// Start fetching a record from the DHT.
///
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: record::Key) {
for k in self.kademlias.values_mut() {
if let Some(k) = self.kademlia.as_mut() {
k.get_record(key.clone(), Quorum::One);
}
}
@@ -373,7 +366,7 @@ impl DiscoveryBehaviour {
///
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
for k in self.kademlias.values_mut() {
if let Some(k) = self.kademlia.as_mut() {
if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
self.pending_events
@@ -386,37 +379,27 @@ impl DiscoveryBehaviour {
///
/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
/// of their lower bound.
pub fn num_entries_per_kbucket(
&mut self,
) -> impl ExactSizeIterator<Item = (&ProtocolId, Vec<(u32, usize)>)> {
self.kademlias.iter_mut().map(|(id, kad)| {
let buckets = kad
.kbuckets()
pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
self.kademlia.as_mut().map(|kad| {
kad.kbuckets()
.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
.collect();
(id, buckets)
.collect()
})
}
/// Returns the number of records in the Kademlia record stores.
pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
pub fn num_kademlia_records(&mut self) -> Option<usize> {
// Note that this code is ok only because we use a `MemoryStore`.
self.kademlias.iter_mut().map(|(id, kad)| {
let num = kad.store_mut().records().count();
(id, num)
})
self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
}
/// Returns the total size in bytes of all the records in the Kademlia record stores.
pub fn kademlia_records_total_size(
&mut self,
) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
// Note that this code is ok only because we use a `MemoryStore`. If the records were
// for example stored on disk, this would load every single one of them every single time.
self.kademlias.iter_mut().map(|(id, kad)| {
let size = kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len());
(id, size)
})
self.kademlia
.as_mut()
.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
}
/// Can the given `Multiaddr` be put into the DHT?
@@ -425,7 +408,7 @@ impl DiscoveryBehaviour {
// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
// because the set of valid domains is highly dynamic and would require frequent
// updates, for example by utilising publicsuffix.org or IANA.
pub fn can_add_to_dht(&self, addr: &Multiaddr) -> bool {
pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
let ip = match addr.iter().next() {
Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
@@ -435,29 +418,6 @@ impl DiscoveryBehaviour {
};
ip.is_global()
}
fn new_handler_with_replacement(
&mut self,
pid: ProtocolId,
handler: KademliaHandlerProto<QueryId>,
) -> <DiscoveryBehaviour as NetworkBehaviour>::ConnectionHandler {
let mut handlers: HashMap<_, _> = self
.kademlias
.iter_mut()
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)))
.collect();
if let Some(h) = handlers.get_mut(&pid) {
*h = handler
}
IntoMultiHandler::try_from_iter(handlers).expect(
"There can be at most one handler per `ProtocolId` and protocol names contain the \
`ProtocolId` so no two protocol names in `self.kademlias` can be equal which is the \
only error `try_from_iter` can return, therefore this call is guaranteed to succeed; \
qed",
)
}
}
/// Event generated by the `DiscoveryBehaviour`.
@@ -498,28 +458,18 @@ pub enum DiscoveryOut {
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(record::Key, Duration),
/// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s.
/// Started a random Kademlia query.
///
/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
RandomKademliaStarted(Vec<ProtocolId>),
RandomKademliaStarted,
}
impl NetworkBehaviour for DiscoveryBehaviour {
type ConnectionHandler = IntoMultiHandler<ProtocolId, KademliaHandlerProto<QueryId>>;
type ConnectionHandler = ToggleIntoConnectionHandler<KademliaHandlerProto<QueryId>>;
type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ConnectionHandler {
let iter = self
.kademlias
.iter_mut()
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)));
IntoMultiHandler::try_from_iter(iter).expect(
"There can be at most one handler per `ProtocolId` and protocol names contain the \
`ProtocolId` so no two protocol names in `self.kademlias` can be equal which is the \
only error `try_from_iter` can return, therefore this call is guaranteed to succeed; \
qed",
)
self.kademlia.new_handler()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -534,10 +484,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
{
let mut list_to_filter = Vec::new();
for k in self.kademlias.values_mut() {
list_to_filter.extend(k.addresses_of_peer(peer_id))
}
let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id);
if let Some(ref mut mdns) = self.mdns {
list_to_filter.extend(mdns.addresses_of_peer(peer_id));
@@ -566,9 +513,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_address_change(k, peer_id, connection_id, old, new);
}
self.kademlia.inject_address_change(peer_id, connection_id, old, new)
}
fn inject_connection_established(
@@ -580,16 +525,13 @@ impl NetworkBehaviour for DiscoveryBehaviour {
other_established: usize,
) {
self.num_connections += 1;
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_established(
k,
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
}
self.kademlia.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
}
fn inject_connection_closed(
@@ -601,23 +543,19 @@ impl NetworkBehaviour for DiscoveryBehaviour {
remaining_established: usize,
) {
self.num_connections -= 1;
for (pid, event) in handler.into_iter() {
if let Some(kad) = self.kademlias.get_mut(&pid) {
kad.inject_connection_closed(peer_id, conn, endpoint, event, remaining_established)
} else {
error!(
target: "sub-libp2p",
"inject_connection_closed: no kademlia instance registered for protocol {:?}",
pid,
)
}
}
self.kademlia.inject_connection_closed(
peer_id,
conn,
endpoint,
handler,
remaining_established,
)
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ConnectionHandler,
handler: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
@@ -630,32 +568,22 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
for k in self.kademlias.values_mut() {
let handler = k.new_handler();
NetworkBehaviour::inject_dial_failure(k, peer_id, handler, error);
}
self.kademlia.inject_dial_failure(peer_id, handler, error)
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
(pid, event): <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
if let Some(kad) = self.kademlias.get_mut(&pid) {
return kad.inject_event(peer_id, connection, event)
}
error!(
target: "sub-libp2p",
"inject_node_event: no kademlia instance registered for protocol {:?}",
pid,
)
self.kademlia.inject_event(peer_id, connection, event)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id.into()));
if self.can_add_to_dht(addr) {
if Self::can_add_to_dht(addr) {
// NOTE: we might re-discover the same address multiple times
// in which case we just want to refrain from logging.
if self.known_external_addresses.insert(new_addr.clone()) {
@@ -667,36 +595,26 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_external_addr(k, addr)
}
self.kademlia.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
// We intentionally don't remove the element from `known_external_addresses` in order
// to not print the log line again.
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_external_addr(k, addr)
}
self.kademlia.inject_expired_external_addr(addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(k, id, addr)
}
self.kademlia.inject_expired_listen_addr(id, addr)
}
fn inject_new_listener(&mut self, id: ListenerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listener(k, id)
}
self.kademlia.inject_new_listener(id)
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listen_addr(k, id, addr)
}
self.kademlia.inject_new_listen_addr(id, addr)
}
fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ConnectionHandler) {
@@ -704,15 +622,11 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_error(k, id, err)
}
self.kademlia.inject_listener_error(id, err)
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_closed(k, id, reason)
}
self.kademlia.inject_listener_closed(id, reason)
}
fn poll(
@@ -726,198 +640,189 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
// Poll the stream that fires when we need to start a random Kademlia query.
if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
while next_kad_random_query.poll_unpin(cx).is_ready() {
let actually_started = if self.num_connections < self.discovery_only_if_under_num {
let random_peer_id = PeerId::random();
debug!(
target: "sub-libp2p",
"Libp2p <= Starting random Kademlia request for {:?}",
random_peer_id,
);
for k in self.kademlias.values_mut() {
k.get_closest_peers(random_peer_id);
if let Some(kademlia) = self.kademlia.as_mut() {
if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
while next_kad_random_query.poll_unpin(cx).is_ready() {
let actually_started =
if self.num_connections < self.discovery_only_if_under_num {
let random_peer_id = PeerId::random();
debug!(
target: "sub-libp2p",
"Libp2p <= Starting random Kademlia request for {:?}",
random_peer_id,
);
kademlia.get_closest_peers(random_peer_id);
true
} else {
debug!(
target: "sub-libp2p",
"Kademlia paused due to high number of connections ({})",
self.num_connections
);
false
};
// Schedule the next random query with exponentially increasing delay,
// capped at 60 seconds.
*next_kad_random_query = Delay::new(self.duration_to_next_kad);
self.duration_to_next_kad =
cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
if actually_started {
let ev = DiscoveryOut::RandomKademliaStarted;
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
}
true
} else {
debug!(
target: "sub-libp2p",
"Kademlia paused due to high number of connections ({})",
self.num_connections
);
false
};
// Schedule the next random query with exponentially increasing delay,
// capped at 60 seconds.
*next_kad_random_query = Delay::new(self.duration_to_next_kad);
self.duration_to_next_kad =
cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
if actually_started {
let ev = DiscoveryOut::RandomKademliaStarted(
self.kademlias.keys().cloned().collect(),
);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
}
}
}
// Poll Kademlias.
for (pid, kademlia) in &mut self.kademlias {
while let Poll::Ready(ev) = kademlia.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
KademliaEvent::RoutingUpdated { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
KademliaEvent::RoutingUpdated { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::UnroutablePeer { peer, .. } => {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::RoutablePeer { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::PendingRoutablePeer { .. } |
KademliaEvent::InboundRequest { .. } => {
// We are not interested in this event at the moment.
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::GetClosestPeers(res),
..
} => match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Query for {:?} timed out with {} results",
HexDisplay::from(&key), peers.len(),
);
},
KademliaEvent::UnroutablePeer { peer, .. } => {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::RoutablePeer { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::PendingRoutablePeer { .. } |
KademliaEvent::InboundRequest { .. } => {
// We are not interested in this event at the moment.
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::GetClosestPeers(res),
..
} => match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
Ok(ok) => {
trace!(
target: "sub-libp2p",
"Libp2p => Query for {:?} yielded {:?} results",
HexDisplay::from(&ok.key), ok.peers.len(),
);
if ok.peers.is_empty() && self.num_connections != 0 {
debug!(
target: "sub-libp2p",
"Libp2p => Query for {:?} timed out with {} results",
HexDisplay::from(&key), peers.len(),
"Libp2p => Random Kademlia query has yielded empty results",
);
},
}
},
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::GetRecord(res),
stats,
..
} => {
let ev = match res {
Ok(ok) => {
let results = ok
.records
.into_iter()
.map(|r| (r.record.key, r.record.value))
.collect();
DiscoveryOut::ValueFound(
results,
stats.duration().unwrap_or_default(),
)
},
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(
target: "sub-libp2p",
"Libp2p => Query for {:?} yielded {:?} results",
HexDisplay::from(&ok.key), ok.peers.len(),
"Libp2p => Failed to get record: {:?}",
e,
);
if ok.peers.is_empty() && self.num_connections != 0 {
debug!(
target: "sub-libp2p",
"Libp2p => Random Kademlia query has yielded empty results",
);
}
},
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::GetRecord(res),
stats,
..
} => {
let ev = match res {
Ok(ok) => {
let results = ok
.records
.into_iter()
.map(|r| (r.record.key, r.record.value))
.collect();
DiscoveryOut::ValueFound(
results,
stats.duration().unwrap_or_default(),
)
},
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(
target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}",
e,
);
DiscoveryOut::ValueNotFound(
e.into_key(),
stats.duration().unwrap_or_default(),
)
},
Err(e) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}",
e,
);
DiscoveryOut::ValueNotFound(
e.into_key(),
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::PutRecord(res),
stats,
..
} => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(
ok.key,
DiscoveryOut::ValueNotFound(
e.into_key(),
stats.duration().unwrap_or_default(),
),
Err(e) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}",
e,
);
DiscoveryOut::ValuePutFailed(
e.into_key(),
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::RepublishRecord(res),
..
} => match res {
Ok(ok) => debug!(
target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key,
),
Err(e) => debug!(
target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e,
),
},
// We never start any other type of query.
KademliaEvent::OutboundQueryCompleted { result: e, .. } => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
)
},
Err(e) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}",
e,
);
DiscoveryOut::ValueNotFound(
e.into_key(),
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
NetworkBehaviourAction::Dial { opts, handler } => {
let pid = pid.clone();
let handler = self.new_handler_with_replacement(pid, handler);
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler })
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::PutRecord(res),
stats,
..
} => {
let ev = match res {
Ok(ok) =>
DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
Err(e) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}",
e,
);
DiscoveryOut::ValuePutFailed(
e.into_key(),
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: (pid.clone(), event),
}),
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
score,
}),
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
}),
}
KademliaEvent::OutboundQueryCompleted {
result: QueryResult::RepublishRecord(res),
..
} => match res {
Ok(ok) => debug!(
target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key,
),
Err(e) => debug!(
target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e,
),
},
// We never start any other type of query.
KademliaEvent::OutboundQueryCompleted { result: e, .. } => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
},
NetworkBehaviourAction::Dial { opts, handler } =>
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }),
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
}),
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
score,
}),
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
}),
}
}
@@ -1014,7 +919,7 @@ mod tests {
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(protocol_id.clone());
.with_kademlia(protocol_id.clone());
config.finish()
};
@@ -1078,7 +983,7 @@ mod tests {
to_discover[swarm_n].remove(&other);
},
DiscoveryOut::RandomKademliaStarted(_) => {},
DiscoveryOut::RandomKademliaStarted => {},
e => {
panic!("Unexpected event: {:?}", e)
},
@@ -1117,7 +1022,7 @@ mod tests {
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(supported_protocol_id.clone());
.with_kademlia(supported_protocol_id.clone());
config.finish()
};
@@ -1131,7 +1036,8 @@ mod tests {
remote_addr.clone(),
);
for kademlia in discovery.kademlias.values_mut() {
{
let kademlia = discovery.kademlia.as_mut().unwrap();
assert!(
kademlia
.kbucket(remote_peer_id)
@@ -1148,66 +1054,14 @@ mod tests {
remote_addr.clone(),
);
for kademlia in discovery.kademlias.values_mut() {
assert_eq!(
1,
kademlia
.kbucket(remote_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expect peer with supported protocol to be added."
);
}
}
#[test]
fn discovery_adds_peer_to_kademlia_of_same_protocol_only() {
let protocol_a = ProtocolId::from("a");
let protocol_b = ProtocolId::from("b");
let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(protocol_a.clone())
.add_protocol(protocol_b.clone());
config.finish()
};
let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
// Add remote peer with `protocol_a` only.
discovery.add_self_reported_address(
&remote_peer_id,
&[protocol_name_from_protocol_id(&protocol_a)],
remote_addr.clone(),
);
let kademlia = discovery.kademlia.as_mut().unwrap();
assert_eq!(
1,
discovery
.kademlias
.get_mut(&protocol_a)
.expect("Kademlia instance to exist.")
kademlia
.kbucket(remote_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expected remote peer to be added to `protocol_a` Kademlia instance.",
);
assert!(
discovery
.kademlias
.get_mut(&protocol_b)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expected remote peer not to be added to `protocol_b` Kademlia instance.",
"Expect peer with supported protocol to be added."
);
}
}
+9 -23
View File
@@ -284,7 +284,7 @@ where
config.discovery_limit(
u64::from(params.network_config.default_peers_set.out_peers) + 15,
);
config.add_protocol(params.protocol_id.clone());
config.with_kademlia(params.protocol_id.clone());
config.with_dht_random_walk(params.network_config.enable_dht_random_walk);
config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht);
config.use_kademlia_disjoint_query_paths(
@@ -1665,16 +1665,9 @@ where
.user_protocol_mut()
.add_default_set_discovered_nodes(iter::once(peer_id));
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(
protocols,
))) =>
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) =>
if let Some(metrics) = this.metrics.as_ref() {
for protocol in protocols {
metrics
.kademlia_random_queries_total
.with_label_values(&[protocol.as_ref()])
.inc();
}
metrics.kademlia_random_queries_total.inc();
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote,
@@ -2015,28 +2008,21 @@ where
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
for (proto, buckets) in this.network_service.behaviour_mut().num_entries_per_kbucket() {
if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics
.kbuckets_num_nodes
.with_label_values(&[proto.as_ref(), &lower_ilog2_bucket_bound.to_string()])
.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
for (proto, num_entries) in this.network_service.behaviour_mut().num_kademlia_records()
{
metrics
.kademlia_records_count
.with_label_values(&[proto.as_ref()])
.set(num_entries as u64);
if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() {
metrics.kademlia_records_count.set(num_entries as u64);
}
for (proto, num_entries) in
if let Some(num_entries) =
this.network_service.behaviour_mut().kademlia_records_total_size()
{
metrics
.kademlia_records_sizes_total
.with_label_values(&[proto.as_ref()])
.set(num_entries as u64);
metrics.kademlia_records_sizes_total.set(num_entries as u64);
}
metrics
.peerset_num_discovered
+13 -22
View File
@@ -59,9 +59,9 @@ pub struct Metrics {
pub incoming_connections_total: Counter<U64>,
pub issued_light_requests: Counter<U64>,
pub kademlia_query_duration: HistogramVec,
pub kademlia_random_queries_total: CounterVec<U64>,
pub kademlia_records_count: GaugeVec<U64>,
pub kademlia_records_sizes_total: GaugeVec<U64>,
pub kademlia_random_queries_total: Counter<U64>,
pub kademlia_records_count: Gauge<U64>,
pub kademlia_records_sizes_total: Gauge<U64>,
pub kbuckets_num_nodes: GaugeVec<U64>,
pub listeners_local_addresses: Gauge<U64>,
pub listeners_errors_total: Counter<U64>,
@@ -138,33 +138,24 @@ impl Metrics {
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started"
),
&["protocol"]
kademlia_random_queries_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started",
)?, registry)?,
kademlia_records_count: prometheus::register(GaugeVec::new(
Opts::new(
"substrate_sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store"
),
&["protocol"]
kademlia_records_count: prometheus::register(Gauge::new(
"substrate_sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store",
)?, registry)?,
kademlia_records_sizes_total: prometheus::register(GaugeVec::new(
Opts::new(
"substrate_sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store"
),
&["protocol"]
kademlia_records_sizes_total: prometheus::register(Gauge::new(
"substrate_sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store",
)?, registry)?,
kbuckets_num_nodes: prometheus::register(GaugeVec::new(
Opts::new(
"substrate_sub_libp2p_kbuckets_num_nodes",
"Number of nodes per kbucket per Kademlia instance"
),
&["protocol", "lower_ilog2_bucket_bound"]
&["lower_ilog2_bucket_bound"]
)?, registry)?,
listeners_local_addresses: prometheus::register(Gauge::new(
"substrate_sub_libp2p_listeners_local_addresses",