Use a Kademlia instance per ProtocolId. (#5045)

This commit is contained in:
Toralf Wittner
2020-04-16 10:43:40 +02:00
committed by GitHub
parent 980b635c8d
commit 4db45a85de
3 changed files with 358 additions and 199 deletions
+4 -13
View File
@@ -16,7 +16,7 @@
use crate::{
config::Role,
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut,
debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
Event, ObservedRole, DhtEvent, ExHashT,
};
use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol};
@@ -67,28 +67,19 @@ pub enum BehaviourOut<B: BlockT> {
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
/// Builds a new `Behaviour`.
pub async fn new(
pub fn new(
substrate: Protocol<B, H>,
role: Role,
user_agent: String,
local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
enable_mdns: bool,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
block_requests: protocol::BlockRequests<B>,
light_client_handler: protocol::LightClientHandler<B>,
disco_config: DiscoveryConfig,
) -> Self {
Behaviour {
substrate,
debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
discovery: DiscoveryBehaviour::new(
local_public_key,
known_addresses,
enable_mdns,
allow_private_ipv4,
discovery_only_if_under_num,
).await,
discovery: disco_config.finish(),
block_requests,
light_client_handler,
events: Vec::new(),
+327 -173
View File
@@ -45,30 +45,162 @@
//! of a node's address, you must call `add_self_reported_address`.
//!
use crate::config::ProtocolId;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::protocols_handler::multi::MultiHandler;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId;
use libp2p::kad::record::{self, store::MemoryStore};
#[cfg(not(target_os = "unknown"))]
use libp2p::{swarm::toggle::Toggle};
use libp2p::swarm::toggle::Toggle;
#[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn, error};
use std::{cmp, collections::VecDeque, io, time::Duration};
use std::{cmp, collections::{HashMap, HashSet, VecDeque}, io, time::Duration};
use std::task::{Context, Poll};
use sp_core::hexdisplay::HexDisplay;
/// `DiscoveryBehaviour` configuration.
pub struct DiscoveryConfig {
local_peer_id: PeerId,
user_defined: Vec<(PeerId, Multiaddr)>,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
enable_mdns: bool,
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>
}
impl DiscoveryConfig {
/// Crate a default configuration with the given public key.
pub fn new(local_public_key: PublicKey) -> Self {
let mut this = DiscoveryConfig {
local_peer_id: local_public_key.into_peer_id(),
user_defined: Vec::new(),
allow_private_ipv4: true,
discovery_only_if_under_num: std::u64::MAX,
enable_mdns: false,
kademlias: HashMap::new()
};
// Temporary hack to retain backwards compatibility.
// We should eventually remove the special handling of DEFAULT_PROTO_NAME.
let proto_id = ProtocolId::from(libp2p::kad::protocol::DEFAULT_PROTO_NAME);
let proto_name = Vec::from(proto_id.as_bytes());
this.add_kademlia(proto_id, proto_name);
this
}
/// Set the number of active connections at which we pause discovery.
pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
self.discovery_only_if_under_num = limit;
self
}
/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
pub fn with_user_defined<I>(&mut self, user_defined: I) -> &mut Self
where
I: IntoIterator<Item = (PeerId, Multiaddr)>
{
for (peer_id, addr) in user_defined {
for kad in self.kademlias.values_mut() {
kad.add_address(&peer_id, addr.clone())
}
self.user_defined.push((peer_id, addr))
}
self
}
/// Should private IPv4 addresses be reported?
pub fn allow_private_ipv4(&mut self, value: bool) -> &mut Self {
self.allow_private_ipv4 = value;
self
}
/// Should MDNS discovery be supported?
pub fn with_mdns(&mut self, value: bool) -> &mut Self {
if value && cfg!(target_os = "unknown") {
log::warn!(target: "sub-libp2p", "mDNS is not available on this platform")
}
self.enable_mdns = value;
self
}
/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
let proto_name = {
let mut v = vec![b'/'];
v.extend_from_slice(p.as_bytes());
v.extend_from_slice(b"/kad");
v
};
self.add_kademlia(p, proto_name);
self
}
fn add_kademlia(&mut self, id: ProtocolId, proto_name: Vec<u8>) {
if self.kademlias.contains_key(&id) {
warn!(target: "sub-libp2p", "Discovery already registered for protocol {:?}", id);
return
}
let mut config = KademliaConfig::default();
config.set_protocol_name(proto_name);
let store = MemoryStore::new(self.local_peer_id.clone());
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
for (peer_id, addr) in &self.user_defined {
kad.add_address(peer_id, addr.clone());
}
self.kademlias.insert(id, kad);
}
/// Create a `DiscoveryBehaviour` from this config.
pub fn finish(self) -> DiscoveryBehaviour {
DiscoveryBehaviour {
user_defined: self.user_defined,
kademlias: self.kademlias,
next_kad_random_query: Delay::new(Duration::new(0, 0)),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
local_peer_id: self.local_peer_id,
num_connections: 0,
allow_private_ipv4: self.allow_private_ipv4,
discovery_only_if_under_num: self.discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
mdns: if self.enable_mdns {
match Mdns::new() {
Ok(mdns) => Some(mdns).into(),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
None.into()
}
}
} else {
None.into()
},
}
}
}
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
pub struct DiscoveryBehaviour {
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
/// reserved nodes.
user_defined: Vec<(PeerId, Multiaddr)>,
/// Kademlia requests and answers.
kademlia: Kademlia<MemoryStore>,
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
/// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns>,
@@ -90,56 +222,13 @@ pub struct DiscoveryBehaviour {
}
impl DiscoveryBehaviour {
/// Builds a new `DiscoveryBehaviour`.
///
/// `user_defined` is a list of known address for nodes that never expire.
pub async fn new(
local_public_key: PublicKey,
user_defined: Vec<(PeerId, Multiaddr)>,
enable_mdns: bool,
allow_private_ipv4: bool,
discovery_only_if_under_num: u64,
) -> Self {
if enable_mdns {
#[cfg(target_os = "unknown")]
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
}
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let mut kademlia = Kademlia::new(local_id.clone(), store);
for (peer_id, addr) in &user_defined {
kademlia.add_address(peer_id, addr.clone());
}
DiscoveryBehaviour {
user_defined,
kademlia,
next_kad_random_query: Delay::new(Duration::new(0, 0)),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
local_peer_id: local_public_key.into_peer_id(),
num_connections: 0,
allow_private_ipv4,
discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
mdns: if enable_mdns {
match Mdns::new() {
Ok(mdns) => Some(mdns).into(),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
None.into()
}
}
} else {
None.into()
},
}
}
/// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
self.kademlia.kbuckets_entries()
let mut set = HashSet::new();
for p in self.kademlias.values_mut().map(|k| k.kbuckets_entries()).flatten() {
set.insert(p);
}
set.into_iter()
}
/// Adds a hard-coded address for the given peer, that never expires.
@@ -149,7 +238,9 @@ impl DiscoveryBehaviour {
/// If we didn't know this address before, also generates a `Discovered` event.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
self.kademlia.add_address(&peer_id, addr.clone());
for k in self.kademlias.values_mut() {
k.add_address(&peer_id, addr.clone())
}
self.discoveries.push_back(peer_id.clone());
self.user_defined.push((peer_id, addr));
}
@@ -160,14 +251,18 @@ impl DiscoveryBehaviour {
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
/// not properly work.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
self.kademlia.add_address(peer_id, addr);
for k in self.kademlias.values_mut() {
k.add_address(peer_id, addr.clone())
}
}
/// Start fetching a record from the DHT.
///
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: &record::Key) {
self.kademlia.get_record(key, Quorum::One)
for k in self.kademlias.values_mut() {
k.get_record(key, Quorum::One)
}
}
/// Start putting a record into the DHT. Other nodes can later fetch that value with
@@ -175,12 +270,14 @@ impl DiscoveryBehaviour {
///
/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
self.kademlia.put_record(Record::new(key, value), Quorum::All);
for k in self.kademlias.values_mut() {
k.put_record(Record::new(key.clone(), value.clone()), Quorum::All)
}
}
/// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize {
self.kademlia.kbuckets_entries().count()
self.known_peers().count()
}
}
@@ -215,11 +312,19 @@ pub enum DiscoveryOut {
}
impl NetworkBehaviour for DiscoveryBehaviour {
type ProtocolsHandler = <Kademlia<MemoryStore> as NetworkBehaviour>::ProtocolsHandler;
type ProtocolsHandler = MultiHandler<ProtocolId, KademliaHandler<QueryId>>;
type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.kademlia)
let iter = self.kademlias.iter_mut()
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)));
MultiHandler::try_from_iter(iter)
.expect("There can be at most one handler per `ProtocolId` and \
protocol names contain the `ProtocolId` so no two protocol \
names in `self.kademlias` can be equal which is the only error \
`try_from_iter` can return, therefore this call is guaranteed \
to succeed; qed")
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -228,7 +333,11 @@ impl NetworkBehaviour for DiscoveryBehaviour {
.collect::<Vec<_>>();
{
let mut list_to_filter = self.kademlia.addresses_of_peer(peer_id);
let mut list_to_filter = Vec::new();
for k in self.kademlias.values_mut() {
list_to_filter.extend(k.addresses_of_peer(peer_id))
}
#[cfg(not(target_os = "unknown"))]
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
@@ -248,13 +357,23 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
if list.is_empty() {
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) {
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \
and no address was found", peer_id);
let mut has_entry = false;
for k in self.kademlias.values_mut() {
if k.kbuckets_entries().any(|p| p == peer_id) {
has_entry = true;
break
}
}
if has_entry {
debug!(target: "sub-libp2p",
"Requested dialing to {:?} (peer in k-buckets), and no address was found",
peer_id);
} else {
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \
and no address was found", peer_id);
debug!(target: "sub-libp2p",
"Requested dialing to {:?} (peer not in k-buckets), and no address was found",
peer_id);
}
}
list
@@ -262,20 +381,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.num_connections += 1;
NetworkBehaviour::inject_connection_established(&mut self.kademlia, peer_id, conn, endpoint)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
}
}
fn inject_connected(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connected(k, peer_id)
}
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.num_connections -= 1;
NetworkBehaviour::inject_connection_closed(&mut self.kademlia, peer_id, conn, endpoint)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_disconnected(k, peer_id)
}
}
fn inject_addr_reach_failure(
@@ -284,45 +411,65 @@ impl NetworkBehaviour for DiscoveryBehaviour {
addr: &Multiaddr,
error: &dyn std::error::Error
) {
NetworkBehaviour::inject_addr_reach_failure(&mut self.kademlia, peer_id, addr, error)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_addr_reach_failure(k, peer_id, addr, error)
}
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
(pid, event): <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
NetworkBehaviour::inject_event(&mut self.kademlia, peer_id, connection, event)
if let Some(kad) = self.kademlias.get_mut(&pid) {
return kad.inject_event(peer_id, connection, event)
}
log::error!(target: "sub-libp2p",
"inject_node_event: no kademlia instance registered for protocol {:?}",
pid)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone()
.with(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "🔍 Discovered new external address for our node: {}", new_addr);
NetworkBehaviour::inject_new_external_addr(&mut self.kademlia, addr)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_external_addr(k, addr)
}
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
info!(target: "sub-libp2p", "No longer listening on {}", addr);
NetworkBehaviour::inject_expired_listen_addr(&mut self.kademlia, addr)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(k, addr)
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
NetworkBehaviour::inject_dial_failure(&mut self.kademlia, peer_id)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_dial_failure(k, peer_id)
}
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
NetworkBehaviour::inject_new_listen_addr(&mut self.kademlia, addr)
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listen_addr(k, addr)
}
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
error!(target: "sub-libp2p", "Error on libp2p listener {:?}: {}", id, err);
NetworkBehaviour::inject_listener_error(&mut self.kademlia, id, err);
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_error(k, id, err)
}
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id, reason);
error!(target: "sub-libp2p", "Libp2p listener {:?} closed", id);
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_closed(k, id, reason)
}
}
fn poll(
@@ -348,10 +495,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
debug!(target: "sub-libp2p",
"Libp2p <= Starting random Kademlia request for {:?}",
random_peer_id);
self.kademlia.get_closest_peers(random_peer_id);
for k in self.kademlias.values_mut() {
k.get_closest_peers(random_peer_id.clone())
}
true
} else {
debug!(
target: "sub-libp2p",
@@ -373,96 +520,102 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
// Poll Kademlia.
while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
KademliaEvent::UnroutablePeer { peer, .. } => {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutingUpdated { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::GetClosestPeersResult(res) => {
match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
debug!(target: "sub-libp2p",
"Libp2p => Query for {:?} timed out with {} results",
HexDisplay::from(&key), peers.len());
},
Ok(ok) => {
trace!(target: "sub-libp2p",
"Libp2p => Query for {:?} yielded {:?} results",
HexDisplay::from(&ok.key), ok.peers.len());
if ok.peers.is_empty() && self.num_connections != 0 {
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
// Poll Kademlias.
for (pid, kademlia) in &mut self.kademlias {
while let Poll::Ready(ev) = kademlia.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
KademliaEvent::UnroutablePeer { peer, .. } => {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutingUpdated { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::GetClosestPeersResult(res) => {
match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
debug!(target: "sub-libp2p",
"Libp2p => Query for {:?} timed out with {} results",
HexDisplay::from(&key), peers.len());
},
Ok(ok) => {
trace!(target: "sub-libp2p",
"Libp2p => Query for {:?} yielded {:?} results",
HexDisplay::from(&ok.key), ok.peers.len());
if ok.peers.is_empty() && self.num_connections != 0 {
debug!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
}
}
}
}
}
KademliaEvent::GetRecordResult(res) => {
let ev = match res {
Ok(ok) => {
let results = ok.records
.into_iter()
.map(|r| (r.key, r.value))
.collect();
KademliaEvent::GetRecordResult(res) => {
let ev = match res {
Ok(ok) => {
let results = ok.records
.into_iter()
.map(|r| (r.key, r.value))
.collect();
DiscoveryOut::ValueFound(results)
DiscoveryOut::ValueFound(results)
}
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PutRecordResult(res) => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RepublishRecordResult(res) => {
match res {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PutRecordResult(res) => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RepublishRecordResult(res) => {
match res {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}
KademliaEvent::Discovered { .. } => {
// We are not interested in these events at the moment.
}
// We never start any other type of query.
e => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
}
}
KademliaEvent::Discovered { .. } => {
// We are not interested in these events at the moment.
}
// We never start any other type of query.
e => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
}
},
NetworkBehaviourAction::DialAddress { address } =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
NetworkBehaviourAction::DialAddress { address } =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: (pid.clone(), event)
}),
NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}
}
@@ -511,7 +664,7 @@ mod tests {
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm;
use std::{collections::HashSet, task::Poll};
use super::{DiscoveryBehaviour, DiscoveryOut};
use super::{DiscoveryConfig, DiscoveryOut};
#[test]
fn discovery_working() {
@@ -540,13 +693,14 @@ mod tests {
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
});
let behaviour = futures::executor::block_on({
let user_defined = user_defined.clone();
let keypair_public = keypair.public();
async move {
DiscoveryBehaviour::new(keypair_public, user_defined, false, true, 50).await
}
});
let behaviour = {
let mut config = DiscoveryConfig::new(keypair.public());
config.with_user_defined(user_defined.clone())
.allow_private_ipv4(true)
.discovery_limit(50);
config.finish()
};
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
+27 -13
View File
@@ -28,6 +28,7 @@
use crate::{
behaviour::{Behaviour, BehaviourOut},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
@@ -310,24 +311,37 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peerset_handle.clone(),
)
};
let mut behaviour = futures::executor::block_on(Behaviour::new(
let discovery_config = {
let mut config = DiscoveryConfig::new(local_public.clone());
config.with_user_defined(known_addresses);
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
config.add_protocol(params.protocol_id.clone());
match params.network_config.transport {
TransportConfig::MemoryOnly => {
config.with_mdns(false);
config.allow_private_ipv4(false);
}
TransportConfig::Normal { enable_mdns, allow_private_ipv4, .. } => {
config.with_mdns(enable_mdns);
config.allow_private_ipv4(allow_private_ipv4);
}
}
config
};
let mut behaviour = Behaviour::new(
protocol,
params.role,
user_agent,
local_public,
known_addresses,
match params.network_config.transport {
TransportConfig::MemoryOnly => false,
TransportConfig::Normal { enable_mdns, .. } => enable_mdns,
},
match params.network_config.transport {
TransportConfig::MemoryOnly => false,
TransportConfig::Normal { allow_private_ipv4, .. } => allow_private_ipv4,
},
u64::from(params.network_config.out_peers) + 15,
block_requests,
light_client_handler
));
light_client_handler,
discovery_config
);
for (engine_id, protocol_name) in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
}