Use a Kademlia instance per ProtocolId. (#5045)

This commit is contained in:
Toralf Wittner
2020-04-16 10:43:40 +02:00
committed by GitHub
parent 980b635c8d
commit 4db45a85de
3 changed files with 358 additions and 199 deletions
+4 -13
View File
@@ -16,7 +16,7 @@
use crate::{ use crate::{
config::Role, config::Role,
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
Event, ObservedRole, DhtEvent, ExHashT, Event, ObservedRole, DhtEvent, ExHashT,
}; };
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol}; use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
@@ -67,28 +67,19 @@ pub enum BehaviourOut<B: BlockT> {
impl<B: BlockT, H: ExHashT> Behaviour<B, H> { impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
/// Builds a new `Behaviour`. /// Builds a new `Behaviour`.
pub async fn new( pub fn new(
substrate: Protocol<B, H>, substrate: Protocol<B, H>,
role: Role, role: Role,
user_agent: String, user_agent: String,
local_public_key: PublicKey, local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
enable_mdns: bool,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
block_requests: protocol::BlockRequests<B>, block_requests: protocol::BlockRequests<B>,
light_client_handler: protocol::LightClientHandler<B>, light_client_handler: protocol::LightClientHandler<B>,
disco_config: DiscoveryConfig,
) -> Self { ) -> Self {
Behaviour { Behaviour {
substrate, substrate,
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
discovery: DiscoveryBehaviour::new( discovery: disco_config.finish(),
local_public_key,
known_addresses,
enable_mdns,
allow_private_ipv4,
discovery_only_if_under_num,
).await,
block_requests, block_requests,
light_client_handler, light_client_handler,
events: Vec::new(), events: Vec::new(),
+327 -173
View File
@@ -45,30 +45,162 @@
//! of a node's address, you must call `add_self_reported_address`. //! of a node's address, you must call `add_self_reported_address`.
//! //!
use crate::config::ProtocolId;
use futures::prelude::*; use futures::prelude::*;
use futures_timer::Delay; use futures_timer::Delay;
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record}; use libp2p::swarm::protocols_handler::multi::MultiHandler;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
use libp2p::kad::GetClosestPeersError; use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId;
use libp2p::kad::record::{self, store::MemoryStore}; use libp2p::kad::record::{self, store::MemoryStore};
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
use libp2p::{swarm::toggle::Toggle}; use libp2p::swarm::toggle::Toggle;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn, error}; use log::{debug, info, trace, warn, error};
use std::{cmp, collections::VecDeque, io, time::Duration}; use std::{cmp, collections::{HashMap, HashSet, VecDeque}, io, time::Duration};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use sp_core::hexdisplay::HexDisplay; use sp_core::hexdisplay::HexDisplay;
/// `DiscoveryBehaviour` configuration.
pub struct DiscoveryConfig {
local_peer_id: PeerId,
user_defined: Vec<(PeerId, Multiaddr)>,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
enable_mdns: bool,
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>
}
impl DiscoveryConfig {
/// Crate a default configuration with the given public key.
pub fn new(local_public_key: PublicKey) -> Self {
let mut this = DiscoveryConfig {
local_peer_id: local_public_key.into_peer_id(),
user_defined: Vec::new(),
allow_private_ipv4: true,
discovery_only_if_under_num: std::u64::MAX,
enable_mdns: false,
kademlias: HashMap::new()
};
// Temporary hack to retain backwards compatibility.
// We should eventually remove the special handling of DEFAULT_PROTO_NAME.
let proto_id = ProtocolId::from(libp2p::kad::protocol::DEFAULT_PROTO_NAME);
let proto_name = Vec::from(proto_id.as_bytes());
this.add_kademlia(proto_id, proto_name);
this
}
/// Set the number of active connections at which we pause discovery.
pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
self.discovery_only_if_under_num = limit;
self
}
/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
pub fn with_user_defined<I>(&mut self, user_defined: I) -> &mut Self
where
I: IntoIterator<Item = (PeerId, Multiaddr)>
{
for (peer_id, addr) in user_defined {
for kad in self.kademlias.values_mut() {
kad.add_address(&peer_id, addr.clone())
}
self.user_defined.push((peer_id, addr))
}
self
}
/// Should private IPv4 addresses be reported?
pub fn allow_private_ipv4(&mut self, value: bool) -> &mut Self {
self.allow_private_ipv4 = value;
self
}
/// Should MDNS discovery be supported?
pub fn with_mdns(&mut self, value: bool) -> &mut Self {
if value && cfg!(target_os = "unknown") {
log::warn!(target: "sub-libp2p", "mDNS is not available on this platform")
}
self.enable_mdns = value;
self
}
/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
let proto_name = {
let mut v = vec![b'/'];
v.extend_from_slice(p.as_bytes());
v.extend_from_slice(b"/kad");
v
};
self.add_kademlia(p, proto_name);
self
}
fn add_kademlia(&mut self, id: ProtocolId, proto_name: Vec<u8>) {
if self.kademlias.contains_key(&id) {
warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id);
return
}
let mut config = KademliaConfig::default();
config.set_protocol_name(proto_name);
let store = MemoryStore::new(self.local_peer_id.clone());
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
for (peer_id, addr) in &self.user_defined {
kad.add_address(peer_id, addr.clone());
}
self.kademlias.insert(id, kad);
}
/// Create a `DiscoveryBehaviour` from this config.
pub fn finish(self) -> DiscoveryBehaviour {
DiscoveryBehaviour {
user_defined: self.user_defined,
kademlias: self.kademlias,
next_kad_random_query: Delay::new(Duration::new(0, 0)),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
local_peer_id: self.local_peer_id,
num_connections: 0,
allow_private_ipv4: self.allow_private_ipv4,
discovery_only_if_under_num: self.discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
mdns: if self.enable_mdns {
match Mdns::new() {
Ok(mdns) => Some(mdns).into(),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
None.into()
}
}
} else {
None.into()
},
}
}
}
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network. /// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
pub struct DiscoveryBehaviour { pub struct DiscoveryBehaviour {
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and /// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
/// reserved nodes. /// reserved nodes.
user_defined: Vec<(PeerId, Multiaddr)>, user_defined: Vec<(PeerId, Multiaddr)>,
/// Kademlia requests and answers. /// Kademlia requests and answers.
kademlia: Kademlia<MemoryStore>, kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
/// Discovers nodes on the local network. /// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns>, mdns: Toggle<Mdns>,
@@ -90,56 +222,13 @@ pub struct DiscoveryBehaviour {
} }
impl DiscoveryBehaviour { impl DiscoveryBehaviour {
/// Builds a new `DiscoveryBehaviour`.
///
/// `user_defined` is a list of known address for nodes that never expire.
pub async fn new(
local_public_key: PublicKey,
user_defined: Vec<(PeerId, Multiaddr)>,
enable_mdns: bool,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
) -> Self {
if enable_mdns {
#[cfg(target_os = "unknown")]
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
}
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let mut kademlia = Kademlia::new(local_id.clone(), store);
for (peer_id, addr) in &user_defined {
kademlia.add_address(peer_id, addr.clone());
}
DiscoveryBehaviour {
user_defined,
kademlia,
next_kad_random_query: Delay::new(Duration::new(0, 0)),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
local_peer_id: local_public_key.into_peer_id(),
num_connections: 0,
allow_private_ipv4,
discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
mdns: if enable_mdns {
match Mdns::new() {
Ok(mdns) => Some(mdns).into(),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
None.into()
}
}
} else {
None.into()
},
}
}
/// Returns the list of nodes that we know exist in the network. /// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> { pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
self.kademlia.kbuckets_entries() let mut set = HashSet::new();
for p in self.kademlias.values_mut().map(|k| k.kbuckets_entries()).flatten() {
set.insert(p);
}
set.into_iter()
} }
/// Adds a hard-coded address for the given peer, that never expires. /// Adds a hard-coded address for the given peer, that never expires.
@@ -149,7 +238,9 @@ impl DiscoveryBehaviour {
/// If we didn't know this address before, also generates a `Discovered` event. /// If we didn't know this address before, also generates a `Discovered` event.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
self.kademlia.add_address(&peer_id, addr.clone()); for k in self.kademlias.values_mut() {
k.add_address(&peer_id, addr.clone())
}
self.discoveries.push_back(peer_id.clone()); self.discoveries.push_back(peer_id.clone());
self.user_defined.push((peer_id, addr)); self.user_defined.push((peer_id, addr));
} }
@@ -160,14 +251,18 @@ impl DiscoveryBehaviour {
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will /// **Note**: It is important that you call this method, otherwise the discovery mechanism will
/// not properly work. /// not properly work.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
self.kademlia.add_address(peer_id, addr); for k in self.kademlias.values_mut() {
k.add_address(peer_id, addr.clone())
}
} }
/// Start fetching a record from the DHT. /// Start fetching a record from the DHT.
/// ///
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated. /// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: &record::Key) { pub fn get_value(&mut self, key: &record::Key) {
self.kademlia.get_record(key, Quorum::One) for k in self.kademlias.values_mut() {
k.get_record(key, Quorum::One)
}
} }
/// Start putting a record into the DHT. Other nodes can later fetch that value with /// Start putting a record into the DHT. Other nodes can later fetch that value with
@@ -175,12 +270,14 @@ impl DiscoveryBehaviour {
/// ///
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated. /// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) { pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
self.kademlia.put_record(Record::new(key, value), Quorum::All); for k in self.kademlias.values_mut() {
k.put_record(Record::new(key.clone(), value.clone()), Quorum::All)
}
} }
/// Returns the number of nodes that are in the Kademlia k-buckets. /// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize { pub fn num_kbuckets_entries(&mut self) -> usize {
self.kademlia.kbuckets_entries().count() self.known_peers().count()
} }
} }
@@ -215,11 +312,19 @@ pub enum DiscoveryOut {
} }
impl NetworkBehaviour for DiscoveryBehaviour { impl NetworkBehaviour for DiscoveryBehaviour {
type ProtocolsHandler = <Kademlia<MemoryStore> as NetworkBehaviour>::ProtocolsHandler; type ProtocolsHandler = MultiHandler<ProtocolId, KademliaHandler<QueryId>>;
type OutEvent = DiscoveryOut; type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.kademlia) let iter = self.kademlias.iter_mut()
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)));
MultiHandler::try_from_iter(iter)
.expect("There can be at most one handler per `ProtocolId` and \
protocol names contain the `ProtocolId` so no two protocol \
names in `self.kademlias` can be equal which is the only error \
`try_from_iter` can return, therefore this call is guaranteed \
to succeed; qed")
} }
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -228,7 +333,11 @@ impl NetworkBehaviour for DiscoveryBehaviour {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
{ {
let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id); let mut list_to_filter = Vec::new();
for k in self.kademlias.values_mut() {
list_to_filter.extend(k.addresses_of_peer(peer_id))
}
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id)); list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
@@ -248,13 +357,23 @@ impl NetworkBehaviour for DiscoveryBehaviour {
} }
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list); trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
if list.is_empty() { if list.is_empty() {
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) { let mut has_entry = false;
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \ for k in self.kademlias.values_mut() {
and no address was found", peer_id); if k.kbuckets_entries().any(|p| p == peer_id) {
has_entry = true;
break
}
}
if has_entry {
debug!(target: "sub-libp2p",
"Requested dialing to {:?} (peer in k-buckets), and no address was found",
peer_id);
} else { } else {
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \ debug!(target: "sub-libp2p",
and no address was found", peer_id); "Requested dialing to {:?} (peer not in k-buckets), and no address was found",
peer_id);
} }
} }
list list
@@ -262,20 +381,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.num_connections += 1; self.num_connections += 1;
NetworkBehaviour::inject_connection_established(&mut self.kademlia, peer_id, conn, endpoint) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
}
} }
fn inject_connected(&mut self, peer_id: &PeerId) { fn inject_connected(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connected(k, peer_id)
}
} }
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.num_connections -= 1; self.num_connections -= 1;
NetworkBehaviour::inject_connection_closed(&mut self.kademlia, peer_id, conn, endpoint) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
}
} }
fn inject_disconnected(&mut self, peer_id: &PeerId) { fn inject_disconnected(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_disconnected(k, peer_id)
}
} }
fn inject_addr_reach_failure( fn inject_addr_reach_failure(
@@ -284,45 +411,65 @@ impl NetworkBehaviour for DiscoveryBehaviour {
addr: &Multiaddr, addr: &Multiaddr,
error: &dyn std::error::Error error: &dyn std::error::Error
) { ) {
NetworkBehaviour::inject_addr_reach_failure(&mut self.kademlia, peer_id, addr, error) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_addr_reach_failure(k, peer_id, addr, error)
}
} }
fn inject_event( fn inject_event(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
connection: ConnectionId, connection: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, (pid, event): <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) { ) {
NetworkBehaviour::inject_event(&mut self.kademlia, peer_id, connection, event) if let Some(kad) = self.kademlias.get_mut(&pid) {
return kad.inject_event(peer_id, connection, event)
}
log::error!(target: "sub-libp2p",
"inject_node_event: no kademlia instance registered for protocol {:?}",
pid)
} }
fn inject_new_external_addr(&mut self, addr: &Multiaddr) { fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone() let new_addr = addr.clone()
.with(Protocol::P2p(self.local_peer_id.clone().into())); .with(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr); info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr);
NetworkBehaviour::inject_new_external_addr(&mut self.kademlia, addr) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_external_addr(k, addr)
}
} }
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
info!(target: "sub-libp2p", "No longer listening on {}", addr); info!(target: "sub-libp2p", "No longer listening on {}", addr);
NetworkBehaviour::inject_expired_listen_addr(&mut self.kademlia, addr) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(k, addr)
}
} }
fn inject_dial_failure(&mut self, peer_id: &PeerId) { fn inject_dial_failure(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_dial_failure(&mut self.kademlia, peer_id) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_dial_failure(k, peer_id)
}
} }
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
NetworkBehaviour::inject_new_listen_addr(&mut self.kademlia, addr) for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listen_addr(k, addr)
}
} }
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err); error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err);
NetworkBehaviour::inject_listener_error(&mut self.kademlia, id, err); for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_error(k, id, err)
}
} }
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id, reason); error!(target: "sub-libp2p", "Libp2p listener {:?} closed", id);
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_closed(k, id, reason)
}
} }
fn poll( fn poll(
@@ -348,10 +495,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
debug!(target: "sub-libp2p", debug!(target: "sub-libp2p",
"Libp2p <= Starting random Kademlia request for {:?}", "Libp2p <= Starting random Kademlia request for {:?}",
random_peer_id); random_peer_id);
for k in self.kademlias.values_mut() {
self.kademlia.get_closest_peers(random_peer_id); k.get_closest_peers(random_peer_id.clone())
}
true true
} else { } else {
debug!( debug!(
target: "sub-libp2p", target: "sub-libp2p",
@@ -373,96 +520,102 @@ impl NetworkBehaviour for DiscoveryBehaviour {
} }
} }
// Poll Kademlia. // Poll Kademlias.
while let Poll::Ready(ev) = self.kademlia.poll(cx, params) { for (pid, kademlia) in &mut self.kademlias {
match ev { while let Poll::Ready(ev) = kademlia.poll(cx, params) {
NetworkBehaviourAction::GenerateEvent(ev) => match ev { match ev {
KademliaEvent::UnroutablePeer { peer, .. } => { NetworkBehaviourAction::GenerateEvent(ev) => match ev {
let ev = DiscoveryOut::UnroutablePeer(peer); KademliaEvent::UnroutablePeer { peer, .. } => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); let ev = DiscoveryOut::UnroutablePeer(peer);
} return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
KademliaEvent::RoutingUpdated { peer, .. } => { }
let ev = DiscoveryOut::Discovered(peer); KademliaEvent::RoutingUpdated { peer, .. } => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); let ev = DiscoveryOut::Discovered(peer);
} return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
KademliaEvent::GetClosestPeersResult(res) => { }
match res { KademliaEvent::GetClosestPeersResult(res) => {
Err(GetClosestPeersError::Timeout { key, peers }) => { match res {
debug!(target: "sub-libp2p", Err(GetClosestPeersError::Timeout { key, peers }) => {
"Libp2p => Query for {:?} timed out with {} results", debug!(target: "sub-libp2p",
HexDisplay::from(&key), peers.len()); "Libp2p => Query for {:?} timed out with {} results",
}, HexDisplay::from(&key), peers.len());
Ok(ok) => { },
trace!(target: "sub-libp2p", Ok(ok) => {
"Libp2p => Query for {:?} yielded {:?} results", trace!(target: "sub-libp2p",
HexDisplay::from(&ok.key), ok.peers.len()); "Libp2p => Query for {:?} yielded {:?} results",
if ok.peers.is_empty() && self.num_connections != 0 { HexDisplay::from(&ok.key), ok.peers.len());
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \ if ok.peers.is_empty() && self.num_connections != 0 {
results"); debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
}
} }
} }
} }
} KademliaEvent::GetRecordResult(res) => {
KademliaEvent::GetRecordResult(res) => { let ev = match res {
let ev = match res { Ok(ok) => {
Ok(ok) => { let results = ok.records
let results = ok.records .into_iter()
.into_iter() .map(|r| (r.key, r.value))
.map(|r| (r.key, r.value)) .collect();
.collect();
DiscoveryOut::ValueFound(results) DiscoveryOut::ValueFound(results)
}
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PutRecordResult(res) => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RepublishRecordResult(res) => {
match res {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
} }
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => { }
trace!(target: "sub-libp2p", KademliaEvent::Discovered { .. } => {
"Libp2p => Failed to get record: {:?}", e); // We are not interested in these events at the moment.
DiscoveryOut::ValueNotFound(e.into_key()) }
} // We never start any other type of query.
Err(e) => { e => {
warn!(target: "sub-libp2p", warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PutRecordResult(res) => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RepublishRecordResult(res) => {
match res {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
} }
} }
KademliaEvent::Discovered { .. } => { NetworkBehaviourAction::DialAddress { address } =>
// We are not interested in these events at the moment. return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
} NetworkBehaviourAction::DialPeer { peer_id, condition } =>
// We never start any other type of query. return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
e => { NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e) return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
} peer_id,
}, handler,
NetworkBehaviourAction::DialAddress { address } => event: (pid.clone(), event)
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), }),
NetworkBehaviourAction::DialPeer { peer_id, condition } => NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => }
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
} }
} }
@@ -511,7 +664,7 @@ mod tests {
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm; use libp2p::swarm::Swarm;
use std::{collections::HashSet, task::Poll}; use std::{collections::HashSet, task::Poll};
use super::{DiscoveryBehaviour, DiscoveryOut}; use super::{DiscoveryConfig, DiscoveryOut};
#[test] #[test]
fn discovery_working() { fn discovery_working() {
@@ -540,13 +693,14 @@ mod tests {
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
}); });
let behaviour = futures::executor::block_on({ let behaviour = {
let user_defined = user_defined.clone(); let mut config = DiscoveryConfig::new(keypair.public());
let keypair_public = keypair.public(); config.with_user_defined(user_defined.clone())
async move { .allow_private_ipv4(true)
DiscoveryBehaviour::new(keypair_public, user_defined, false, true, 50).await .discovery_limit(50);
} config.finish()
}); };
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap(); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
+27 -13
View File
@@ -28,6 +28,7 @@
use crate::{ use crate::{
behaviour::{Behaviour, BehaviourOut}, behaviour::{Behaviour, BehaviourOut},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig}, config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
discovery::DiscoveryConfig,
error::Error, error::Error,
network_state::{ network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
@@ -310,24 +311,37 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peerset_handle.clone(), peerset_handle.clone(),
) )
}; };
let mut behaviour = futures::executor::block_on(Behaviour::new(
let discovery_config = {
let mut config = DiscoveryConfig::new(local_public.clone());
config.with_user_defined(known_addresses);
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
config.add_protocol(params.protocol_id.clone());
match params.network_config.transport {
TransportConfig::MemoryOnly => {
config.with_mdns(false);
config.allow_private_ipv4(false);
}
TransportConfig::Normal { enable_mdns, allow_private_ipv4, .. } => {
config.with_mdns(enable_mdns);
config.allow_private_ipv4(allow_private_ipv4);
}
}
config
};
let mut behaviour = Behaviour::new(
protocol, protocol,
params.role, params.role,
user_agent, user_agent,
local_public, local_public,
known_addresses,
match params.network_config.transport {
TransportConfig::MemoryOnly => false,
TransportConfig::Normal { enable_mdns, .. } => enable_mdns,
},
match params.network_config.transport {
TransportConfig::MemoryOnly => false,
TransportConfig::Normal { allow_private_ipv4, .. } => allow_private_ipv4,
},
u64::from(params.network_config.out_peers) + 15,
block_requests, block_requests,
light_client_handler light_client_handler,
)); discovery_config
);
for (engine_id, protocol_name) in &params.network_config.notifications_protocols { for (engine_id, protocol_name) in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
} }