network: optimize update procedure for listen_addrs and external_addrs (#14689)

* network: optimize listen_address update procedure

* network: optimize external_addr update procedure

* replace on_swarm_event with add/remove
This commit is contained in:
Alin Dima
2023-08-01 13:39:15 +03:00
committed by GitHub
parent 9cbe7fdbf4
commit addd8628ad
3 changed files with 57 additions and 24 deletions
+8 -2
View File
@@ -34,9 +34,10 @@ use libp2p::{
identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId,
};
use parking_lot::Mutex;
use sc_network_common::role::{ObservedRole, Roles};
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure};
@@ -174,10 +175,15 @@ impl<B: BlockT> Behaviour<B> {
request_response_protocols: Vec<ProtocolConfig>,
peerset: PeersetHandle,
connection_limits: ConnectionLimits,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
) -> Result<Self, request_responses::RegisterError> {
Ok(Self {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
peer_info: peer_info::PeerInfoBehaviour::new(
user_agent,
local_public_key,
external_addresses,
),
discovery: disco_config.finish(),
connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits),
request_responses: request_responses::RequestResponsesBehaviour::new(
+31 -2
View File
@@ -43,11 +43,13 @@ use libp2p::{
Multiaddr, PeerId,
};
use log::{debug, error, trace};
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
collections::{hash_map::Entry, HashSet},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
@@ -67,6 +69,8 @@ pub struct PeerInfoBehaviour {
nodes_info: FnvHashMap<PeerId, NodeInfo>,
/// Interval at which we perform garbage collection in `nodes_info`.
garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Record keeping of external addresses. Data is queried by the `NetworkService`.
external_addresses: ExternalAddresses,
}
/// Information about a node we're connected to.
@@ -91,9 +95,31 @@ impl NodeInfo {
}
}
/// Utility struct for tracking external addresses. The data is shared with the `NetworkService`.
#[derive(Debug, Clone, Default)]
pub struct ExternalAddresses {
addresses: Arc<Mutex<HashSet<Multiaddr>>>,
}
impl ExternalAddresses {
/// Add an external address.
pub fn add(&mut self, addr: Multiaddr) {
self.addresses.lock().insert(addr);
}
/// Remove an external address.
pub fn remove(&mut self, addr: &Multiaddr) {
self.addresses.lock().remove(addr);
}
}
impl PeerInfoBehaviour {
/// Builds a new `PeerInfoBehaviour`.
pub fn new(user_agent: String, local_public_key: PublicKey) -> Self {
pub fn new(
user_agent: String,
local_public_key: PublicKey,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
) -> Self {
let identify = {
let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key)
.with_agent_version(user_agent)
@@ -107,6 +133,7 @@ impl PeerInfoBehaviour {
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
external_addresses: ExternalAddresses { addresses: external_addresses },
}
}
@@ -367,6 +394,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
FromSwarm::ExpiredListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.external_addresses.remove(e.addr);
},
FromSwarm::NewExternalAddrCandidate(e) => {
self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
@@ -375,6 +403,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
FromSwarm::ExternalAddrConfirmed(e) => {
self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
self.external_addresses.add(e.addr.clone());
},
FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
self.ping.on_swarm_event(FromSwarm::AddressChange(e));
+18 -20
View File
@@ -104,9 +104,9 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// Number of peers we're connected to.
num_connected: Arc<AtomicUsize>,
/// The local external addresses.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
/// Local copy of the `PeerId` of the local node.
local_peer_id: PeerId,
/// The `KeyPair` that defines the `PeerId` of the local node.
@@ -301,6 +301,7 @@ where
})?;
let num_connected = Arc::new(AtomicUsize::new(0));
let external_addresses = Arc::new(Mutex::new(HashSet::new()));
// Build the swarm.
let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
@@ -354,6 +355,7 @@ where
.with_max_established_incoming(Some(
crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
)),
external_addresses.clone(),
);
match result {
@@ -412,13 +414,12 @@ where
Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone());
}
let external_addresses = Arc::new(Mutex::new(Vec::new()));
let listen_addresses = Arc::new(Mutex::new(Vec::new()));
let listen_addresses = Arc::new(Mutex::new(HashSet::new()));
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
let service = Arc::new(NetworkService {
bandwidth,
external_addresses: external_addresses.clone(),
external_addresses,
listen_addresses: listen_addresses.clone(),
num_connected: num_connected.clone(),
peerset: peerset_handle,
@@ -434,7 +435,6 @@ where
});
Ok(NetworkWorker {
external_addresses,
listen_addresses,
num_connected,
network_service: swarm,
@@ -694,12 +694,12 @@ where
{
/// Returns the local external addresses.
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.lock().clone()
self.external_addresses.lock().iter().cloned().collect()
}
/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
fn listen_addresses(&self) -> Vec<Multiaddr> {
self.listen_addresses.lock().clone()
self.listen_addresses.lock().iter().cloned().collect()
}
/// Returns the local Peer ID.
@@ -1123,9 +1123,7 @@ where
H: ExHashT,
{
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_connected: Arc<AtomicUsize>,
/// The network service that can be extracted and shared through the codebase.
@@ -1182,18 +1180,10 @@ where
},
};
// Update the variables shared with the `NetworkService`.
// Update the `num_connected` count shared with the `NetworkService`.
let num_connected_peers =
self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses = self.network_service.external_addresses().cloned().collect();
*self.external_addresses.lock() = external_addresses;
let listen_addresses =
self.network_service.listeners().map(ToOwned::to_owned).collect();
*self.listen_addresses.lock() = listen_addresses;
}
if let Some(metrics) = self.metrics.as_ref() {
if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
@@ -1602,12 +1592,14 @@ where
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.inc();
}
self.listen_addresses.lock().insert(address.clone());
},
SwarmEvent::ExpiredListenAddr { address, .. } => {
info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.dec();
}
self.listen_addresses.lock().remove(&address);
},
SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
if let Some(peer_id) = peer_id {
@@ -1712,6 +1704,12 @@ where
if let Some(metrics) = self.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64);
}
let mut listen_addresses = self.listen_addresses.lock();
for addr in &addresses {
listen_addresses.remove(addr);
}
drop(listen_addresses);
let addrs =
addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
match reason {