mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 10:07:56 +00:00
Make public addresses go first in authority discovery DHT records (#3757)
Make sure explicitly set by the operator public addresses go first in the authority discovery DHT records. Also update `Discovery` behavior to eliminate duplicates in the returned addresses. This PR should improve situation with https://github.com/paritytech/polkadot-sdk/issues/3519. Obsoletes https://github.com/paritytech/polkadot-sdk/pull/3657.
This commit is contained in:
Generated
+1
@@ -15555,6 +15555,7 @@ dependencies = [
|
||||
"futures-timer",
|
||||
"ip_network",
|
||||
"libp2p",
|
||||
"linked_hash_set",
|
||||
"log",
|
||||
"multihash 0.18.1",
|
||||
"multihash-codetable",
|
||||
|
||||
@@ -55,6 +55,7 @@ fn build_authority_discovery_service<Block: BlockT>(
|
||||
prometheus_registry: Option<Registry>,
|
||||
) -> AuthorityDiscoveryService {
|
||||
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
|
||||
let auth_disc_public_addresses = config.network.public_addresses.clone();
|
||||
let authority_discovery_role = sc_authority_discovery::Role::Discover;
|
||||
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move {
|
||||
match e {
|
||||
@@ -65,6 +66,7 @@ fn build_authority_discovery_service<Block: BlockT>(
|
||||
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
|
||||
sc_authority_discovery::WorkerConfig {
|
||||
publish_non_global_ips: auth_disc_publish_non_global_ips,
|
||||
public_addresses: auth_disc_public_addresses,
|
||||
// Require that authority discovery records are signed.
|
||||
strict_record_validation: true,
|
||||
..Default::default()
|
||||
|
||||
@@ -807,6 +807,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
|
||||
let shared_voter_state = rpc_setup;
|
||||
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
|
||||
let auth_disc_public_addresses = config.network.public_addresses.clone();
|
||||
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
|
||||
|
||||
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
|
||||
@@ -1061,6 +1062,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
|
||||
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
|
||||
sc_authority_discovery::WorkerConfig {
|
||||
publish_non_global_ips: auth_disc_publish_non_global_ips,
|
||||
public_addresses: auth_disc_public_addresses,
|
||||
// Require that authority discovery records are signed.
|
||||
strict_record_validation: true,
|
||||
..Default::default()
|
||||
|
||||
@@ -422,6 +422,7 @@ pub fn new_full_base(
|
||||
|
||||
let shared_voter_state = rpc_setup;
|
||||
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
|
||||
let auth_disc_public_addresses = config.network.public_addresses.clone();
|
||||
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
|
||||
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
|
||||
|
||||
@@ -610,6 +611,7 @@ pub fn new_full_base(
|
||||
sc_authority_discovery::new_worker_and_service_with_config(
|
||||
sc_authority_discovery::WorkerConfig {
|
||||
publish_non_global_ips: auth_disc_publish_non_global_ips,
|
||||
public_addresses: auth_disc_public_addresses,
|
||||
..Default::default()
|
||||
},
|
||||
client.clone(),
|
||||
|
||||
@@ -29,6 +29,7 @@ multihash = { version = "0.18.1", default-features = false, features = [
|
||||
"sha2",
|
||||
"std",
|
||||
] }
|
||||
linked_hash_set = "0.1.4"
|
||||
log = { workspace = true, default-features = true }
|
||||
prost = "0.12"
|
||||
rand = "0.8.5"
|
||||
|
||||
@@ -80,6 +80,10 @@ pub struct WorkerConfig {
|
||||
/// Defaults to `true` to avoid the surprise factor.
|
||||
pub publish_non_global_ips: bool,
|
||||
|
||||
/// Public addresses set by the node operator to always publish first in the authority
|
||||
/// discovery DHT record.
|
||||
pub public_addresses: Vec<Multiaddr>,
|
||||
|
||||
/// Reject authority discovery records that are not signed by their network identity (PeerId)
|
||||
///
|
||||
/// Defaults to `false` to provide compatibility with old versions
|
||||
@@ -104,6 +108,7 @@ impl Default for WorkerConfig {
|
||||
// `authority_discovery_dht_event_received`.
|
||||
max_query_interval: Duration::from_secs(10 * 60),
|
||||
publish_non_global_ips: true,
|
||||
public_addresses: Vec::new(),
|
||||
strict_record_validation: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ use addr_cache::AddrCache;
|
||||
use codec::{Decode, Encode};
|
||||
use ip_network::IpNetwork;
|
||||
use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId};
|
||||
use linked_hash_set::LinkedHashSet;
|
||||
use multihash_codetable::{Code, MultihashDigest};
|
||||
|
||||
use log::{debug, error, log_enabled};
|
||||
@@ -120,14 +121,22 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
|
||||
|
||||
/// Interval to be proactive, publishing own addresses.
|
||||
publish_interval: ExpIncInterval,
|
||||
|
||||
/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
|
||||
/// have changed.
|
||||
publish_if_changed_interval: ExpIncInterval,
|
||||
|
||||
/// List of keys onto which addresses have been published at the latest publication.
|
||||
/// Used to check whether they have changed.
|
||||
latest_published_keys: HashSet<AuthorityId>,
|
||||
|
||||
/// Same value as in the configuration.
|
||||
publish_non_global_ips: bool,
|
||||
|
||||
/// Public addresses set by the node operator to always publish first in the authority
|
||||
/// discovery DHT record.
|
||||
public_addresses: LinkedHashSet<Multiaddr>,
|
||||
|
||||
/// Same value as in the configuration.
|
||||
strict_record_validation: bool,
|
||||
|
||||
@@ -136,6 +145,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
|
||||
|
||||
/// Queue of throttled lookups pending to be passed to the network.
|
||||
pending_lookups: Vec<AuthorityId>,
|
||||
|
||||
/// Set of in-flight lookups.
|
||||
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
|
||||
|
||||
@@ -224,6 +234,29 @@ where
|
||||
None => None,
|
||||
};
|
||||
|
||||
let public_addresses = {
|
||||
let local_peer_id: Multihash = network.local_peer_id().into();
|
||||
|
||||
config
|
||||
.public_addresses
|
||||
.into_iter()
|
||||
.map(|mut address| {
|
||||
if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
|
||||
if peer_id != local_peer_id {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Discarding invalid local peer ID in public address {address}.",
|
||||
);
|
||||
}
|
||||
// Always discard `/p2p/...` protocol for proper address comparison (local
|
||||
// peer id will be added before publishing).
|
||||
address.pop();
|
||||
}
|
||||
address
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
Worker {
|
||||
from_service: from_service.fuse(),
|
||||
client,
|
||||
@@ -233,6 +266,7 @@ where
|
||||
publish_if_changed_interval,
|
||||
latest_published_keys: HashSet::new(),
|
||||
publish_non_global_ips: config.publish_non_global_ips,
|
||||
public_addresses,
|
||||
strict_record_validation: config.strict_record_validation,
|
||||
query_interval,
|
||||
pending_lookups: Vec::new(),
|
||||
@@ -304,32 +338,48 @@ where
|
||||
}
|
||||
|
||||
fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
|
||||
let peer_id: Multihash = self.network.local_peer_id().into();
|
||||
let publish_non_global_ips = self.publish_non_global_ips;
|
||||
let addresses = self.network.external_addresses().into_iter().filter(move |a| {
|
||||
if publish_non_global_ips {
|
||||
return true
|
||||
}
|
||||
let addresses = self
|
||||
.public_addresses
|
||||
.clone()
|
||||
.into_iter()
|
||||
.chain(self.network.external_addresses().into_iter().filter_map(|mut address| {
|
||||
// Make sure the reported external address does not contain `/p2p/...` protocol.
|
||||
if let Some(multiaddr::Protocol::P2p(_)) = address.iter().last() {
|
||||
address.pop();
|
||||
}
|
||||
|
||||
a.iter().all(|p| match p {
|
||||
// The `ip_network` library is used because its `is_global()` method is stable,
|
||||
// while `is_global()` in the standard library currently isn't.
|
||||
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
|
||||
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
|
||||
_ => true,
|
||||
if self.public_addresses.contains(&address) {
|
||||
// Already added above.
|
||||
None
|
||||
} else {
|
||||
Some(address)
|
||||
}
|
||||
}))
|
||||
.filter(move |address| {
|
||||
if publish_non_global_ips {
|
||||
return true
|
||||
}
|
||||
|
||||
address.iter().all(|protocol| match protocol {
|
||||
// The `ip_network` library is used because its `is_global()` method is stable,
|
||||
// while `is_global()` in the standard library currently isn't.
|
||||
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
|
||||
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
|
||||
_ => true,
|
||||
})
|
||||
})
|
||||
});
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
debug!(target: LOG_TARGET, "Authority DHT record peer_id='{:?}' addresses='{:?}'", peer_id, addresses.clone().collect::<Vec<_>>());
|
||||
let peer_id = self.network.local_peer_id();
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"Authority DHT record peer_id='{peer_id}' addresses='{addresses:?}'",
|
||||
);
|
||||
|
||||
// The address must include the peer id if not already set.
|
||||
addresses.map(move |a| {
|
||||
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
|
||||
a
|
||||
} else {
|
||||
a.with(multiaddr::Protocol::P2p(peer_id))
|
||||
}
|
||||
})
|
||||
// The address must include the peer id.
|
||||
let peer_id: Multihash = peer_id.into();
|
||||
addresses.into_iter().map(move |a| a.with(multiaddr::Protocol::P2p(peer_id)))
|
||||
}
|
||||
|
||||
/// Publish own public addresses.
|
||||
|
||||
@@ -29,7 +29,7 @@ futures = "0.3.21"
|
||||
futures-timer = "3.0.2"
|
||||
ip_network = "0.4.1"
|
||||
libp2p = { version = "0.51.4", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "websocket", "yamux"] }
|
||||
linked_hash_set = "0.1.3"
|
||||
linked_hash_set = "0.1.4"
|
||||
log = { workspace = true, default-features = true }
|
||||
mockall = "0.11.3"
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
@@ -72,6 +72,7 @@ use libp2p::{
|
||||
},
|
||||
PeerId,
|
||||
};
|
||||
use linked_hash_set::LinkedHashSet;
|
||||
use log::{debug, info, trace, warn};
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
use std::{
|
||||
@@ -550,14 +551,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
|
||||
let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
|
||||
|
||||
let mut list = self
|
||||
// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
|
||||
// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
|
||||
// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
|
||||
// discovery DHT records).
|
||||
let mut list: LinkedHashSet<_> = self
|
||||
.permanent_addresses
|
||||
.iter()
|
||||
.filter_map(|(p, a)| (*p == peer_id).then_some(a.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
.collect();
|
||||
|
||||
if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
|
||||
list.extend(ephemeral_addresses.clone());
|
||||
ephemeral_addresses.iter().for_each(|address| {
|
||||
list.insert_if_absent(address.clone());
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -583,12 +590,14 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
});
|
||||
}
|
||||
|
||||
list.extend(list_to_filter);
|
||||
list_to_filter.into_iter().for_each(|address| {
|
||||
list.insert_if_absent(address);
|
||||
});
|
||||
}
|
||||
|
||||
trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);
|
||||
|
||||
Ok(list)
|
||||
Ok(list.into_iter().collect())
|
||||
}
|
||||
|
||||
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
|
||||
|
||||
Reference in New Issue
Block a user