diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index a03a6caa2f..e7aca1975c 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -54,6 +54,8 @@ pub enum BehaviourOut { BlockImport(BlockOrigin, Vec>), JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), + /// Started a random Kademlia discovery query. + RandomKademliaStarted, Event(Event), } @@ -96,6 +98,11 @@ impl Behaviour { self.discovery.add_known_address(peer_id, addr) } + /// Returns the number of nodes that are in the Kademlia k-buckets. + pub fn num_kbuckets_entries(&mut self) -> usize { + self.discovery.num_kbuckets_entries() + } + /// Borrows `self` and returns a struct giving access to the information about a node. /// /// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes @@ -216,6 +223,9 @@ impl NetworkBehaviourEventProcess DiscoveryOut::ValuePutFailed(key) => { self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); } + DiscoveryOut::RandomKademliaStarted => { + self.events.push(BehaviourOut::RandomKademliaStarted); + } } } } diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 8360fce518..ecce7d81e3 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -176,6 +176,11 @@ impl DiscoveryBehaviour { pub fn put_value(&mut self, key: record::Key, value: Vec) { self.kademlia.put_record(Record::new(key, value), Quorum::All); } + + /// Returns the number of nodes that are in the Kademlia k-buckets. + pub fn num_kbuckets_entries(&mut self) -> usize { + self.kademlia.kbuckets_entries().count() + } } /// Event generated by the `DiscoveryBehaviour`. @@ -203,6 +208,9 @@ pub enum DiscoveryOut { /// Inserting a value into the DHT failed. ValuePutFailed(record::Key), + + /// Started a random Kademlia query. + RandomKademliaStarted, } impl NetworkBehaviour for DiscoveryBehaviour { @@ -330,25 +338,33 @@ impl NetworkBehaviour for DiscoveryBehaviour { // Poll the stream that fires when we need to start a random Kademlia query. while let Poll::Ready(_) = self.next_kad_random_query.poll_unpin(cx) { - if self.num_connections < self.discovery_only_if_under_num { + let actually_started = if self.num_connections < self.discovery_only_if_under_num { let random_peer_id = PeerId::random(); debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \ {:?}", random_peer_id); self.kademlia.get_closest_peers(random_peer_id); + true + } else { debug!( target: "sub-libp2p", "Kademlia paused due to high number of connections ({})", self.num_connections ); - } + false + }; // Schedule the next random query with exponentially increasing delay, // capped at 60 seconds. self.next_kad_random_query = Delay::new(self.duration_to_next_kad); self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); + + if actually_started { + let ev = DiscoveryOut::RandomKademliaStarted; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } } // Poll Kademlia. diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index c19a230769..17208aab50 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -539,6 +539,16 @@ impl Protocol { self.behaviour.is_open(peer_id) } + /// Returns the list of all the peers that the peerset currently requests us to be connected to. + pub fn requested_peers(&self) -> impl Iterator { + self.behaviour.requested_peers() + } + + /// Returns the number of discovered nodes that we keep in memory. + pub fn num_discovered_peers(&self) -> usize { + self.behaviour.num_discovered_peers() + } + /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer_id: &PeerId) { self.behaviour.disconnect_peer(peer_id) diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 24e96681a0..727415baaf 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -191,6 +191,20 @@ impl PeerState { PeerState::Incoming { .. } => false, } } + + /// True if that node has been requested by the PSM. + fn is_requested(&self) -> bool { + match self { + PeerState::Poisoned => false, + PeerState::Banned { .. } => false, + PeerState::PendingRequest { .. } => true, + PeerState::Requested => true, + PeerState::Disabled { .. } => false, + PeerState::DisabledPendingEnable { .. } => true, + PeerState::Enabled { .. } => true, + PeerState::Incoming { .. } => false, + } + } } /// State of an "incoming" message sent to the peer set manager. @@ -277,6 +291,11 @@ impl GenericProto { self.notif_protocols.push((protocol_name.into(), engine_id, handshake_msg.into())); } + /// Returns the number of discovered nodes that we keep in memory. + pub fn num_discovered_peers(&self) -> usize { + self.peerset.num_discovered_peers() + } + /// Returns the list of all the peers we have an open channel to. pub fn open_peers<'a>(&'a self) -> impl Iterator + 'a { self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id) @@ -360,6 +379,11 @@ impl GenericProto { } } + /// Returns the list of all the peers that the peerset currently requests us to be connected to. + pub fn requested_peers<'a>(&'a self) -> impl Iterator + 'a { + self.peers.iter().filter(|(_, state)| state.is_requested()).map(|(id, _)| id) + } + /// Returns true if we try to open protocols with the given peer. pub fn is_enabled(&self, peer_id: &PeerId) -> bool { match self.peers.get(peer_id) { diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 5c618ac4ec..a220a009f3 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -25,7 +25,7 @@ //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. -use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path}; +use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path, str}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use std::pin::Pin; use std::task::Poll; @@ -39,7 +39,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use parking_lot::Mutex; use sc_peerset::PeersetHandle; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; -use prometheus_endpoint::{Registry, Gauge, U64, register, PrometheusError}; +use prometheus_endpoint::{Registry, Counter, CounterVec, Gauge, GaugeVec, Opts, U64, register, PrometheusError}; use crate::{behaviour::{Behaviour, BehaviourOut}, config::{parse_str_addr, parse_addr}}; use crate::{transport, config::NonReservedPeerMode, ReputationChange}; @@ -734,25 +734,108 @@ pub struct NetworkWorker { /// Senders for events that happen on the network. event_streams: Vec>, /// Prometheus network metrics. - metrics: Option + metrics: Option, } struct Metrics { + // This list is ordered alphabetically + connections: Gauge, + import_queue_blocks_submitted: Counter, + import_queue_finality_proofs_submitted: Counter, + import_queue_justifications_submitted: Counter, is_major_syncing: Gauge, + kbuckets_num_nodes: Gauge, + network_per_sec_bytes: GaugeVec, + notifications_total: CounterVec, + num_event_stream_channels: Gauge, + opened_notification_streams: GaugeVec, peers_count: Gauge, + peerset_num_discovered: Gauge, + peerset_num_requested: Gauge, + random_kademalia_queries_total: Counter, } impl Metrics { fn register(registry: &Registry) -> Result { Ok(Self { + // This list is ordered alphabetically + connections: register(Gauge::new( + "sub_libp2p_connections", "Number of libp2p connections" + )?, registry)?, + import_queue_blocks_submitted: register(Counter::new( + "import_queue_blocks_submitted", + "Number of blocks submitted to the import queue.", + )?, registry)?, + import_queue_finality_proofs_submitted: register(Counter::new( + "import_queue_finality_proofs_submitted", + "Number of finality proofs submitted to the import queue.", + )?, registry)?, + import_queue_justifications_submitted: register(Counter::new( + "import_queue_justifications_submitted", + "Number of justifications submitted to the import queue.", + )?, registry)?, is_major_syncing: register(Gauge::new( - "is_major_syncing", "Whether the node is performing a major sync or not.", + "sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.", + )?, registry)?, + kbuckets_num_nodes: register(Gauge::new( + "sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets" + )?, registry)?, + network_per_sec_bytes: register(GaugeVec::new( + Opts::new( + "sub_libp2p_network_per_sec_bytes", + "Average bandwidth usage per second" + ), + &["direction"] + )?, registry)?, + notifications_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_notifications_total", + "Number of notification received from all nodes" + ), + &["direction", "protocol"] + )?, registry)?, + num_event_stream_channels: register(Gauge::new( + "sub_libp2p_num_event_stream_channels", + "Number of internal active channels that broadcast network events", + )?, registry)?, + opened_notification_streams: register(GaugeVec::new( + Opts::new( + "sub_libp2p_opened_notification_streams", + "Number of open notification substreams" + ), + &["protocol"] )?, registry)?, peers_count: register(Gauge::new( - "peers_count", "Number of network gossip peers", + "sub_libp2p_peers_count", "Number of network gossip peers", + )?, registry)?, + peerset_num_discovered: register(Gauge::new( + "sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager", + )?, registry)?, + peerset_num_requested: register(Gauge::new( + "sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to", + )?, registry)?, + random_kademalia_queries_total: register(Counter::new( + "sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started", )?, registry)?, }) } + + fn update_with_network_event(&self, event: &Event) { + match event { + Event::NotificationStreamOpened { engine_id, .. } => { + self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).inc(); + }, + Event::NotificationStreamClosed { engine_id, .. } => { + self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).dec(); + }, + Event::NotificationsReceived { messages, .. } => { + for (engine_id, _) in messages { + self.notifications_total.with_label_values(&["in", &engine_id_to_string(&engine_id)]).inc(); + } + }, + _ => {} + } + } } impl Future for NetworkWorker { @@ -800,10 +883,15 @@ impl Future for NetworkWorker { this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => - this.network_service.user_protocol_mut().write_notification(target, engine_id, message), + ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.notifications_total.with_label_values(&["out", &engine_id_to_string(&engine_id)]).inc(); + } + this.network_service.user_protocol_mut().write_notification(target, engine_id, message) + }, ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { - let events = this.network_service.user_protocol_mut().register_notifications_protocol(engine_id, protocol_name); + let events = this.network_service.user_protocol_mut() + .register_notifications_protocol(engine_id, protocol_name); for event in events { this.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok()); } @@ -821,18 +909,47 @@ impl Future for NetworkWorker { match poll_value { Poll::Pending => break, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => - this.import_queue.import_blocks(origin, blocks), - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => - this.import_queue.import_justification(origin, hash, nb, justification), - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => - this.import_queue.import_finality_proof(origin, hash, nb, proof), - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => - this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()), - Poll::Ready(SwarmEvent::Connected(peer_id)) => - trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id), - Poll::Ready(SwarmEvent::Disconnected(peer_id)) => - trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id), + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.import_queue_blocks_submitted.inc(); + } + this.import_queue.import_blocks(origin, blocks); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.import_queue_justifications_submitted.inc(); + } + this.import_queue.import_justification(origin, hash, nb, justification); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.import_queue_finality_proofs_submitted.inc(); + } + this.import_queue.import_finality_proof(origin, hash, nb, proof); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.random_kademalia_queries_total.inc(); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { + this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); + if let Some(metrics) = this.metrics.as_ref() { + metrics.update_with_network_event(&ev); + } + }, + Poll::Ready(SwarmEvent::Connected(peer_id)) => { + trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); + if let Some(metrics) = this.metrics.as_ref() { + metrics.connections.inc(); + } + }, + Poll::Ready(SwarmEvent::Disconnected(peer_id)) => { + trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id); + if let Some(metrics) = this.metrics.as_ref() { + metrics.connections.dec(); + } + }, Poll::Ready(SwarmEvent::NewListenAddr(addr)) => trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr), Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) => @@ -861,8 +978,14 @@ impl Future for NetworkWorker { this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { + metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec()); + metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec()); metrics.is_major_syncing.set(is_major_syncing as u64); + metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64); + metrics.num_event_stream_channels.set(this.event_streams.len() as u64); metrics.peers_count.set(num_connected_peers as u64); + metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64); + metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64); } Poll::Pending @@ -872,6 +995,15 @@ impl Future for NetworkWorker { impl Unpin for NetworkWorker { } +/// Turns a `ConsensusEngineId` into a representable string. +fn engine_id_to_string(id: &ConsensusEngineId) -> Cow { + if let Ok(s) = std::str::from_utf8(&id[..]) { + Cow::Borrowed(s) + } else { + Cow::Owned(format!("{:?}", id)) + } +} + /// The libp2p swarm, customized for our needs. type Swarm = libp2p::swarm::Swarm>; diff --git a/substrate/client/peerset/src/lib.rs b/substrate/client/peerset/src/lib.rs index bd6c19a62d..87ed2336ae 100644 --- a/substrate/client/peerset/src/lib.rs +++ b/substrate/client/peerset/src/lib.rs @@ -518,6 +518,11 @@ impl Peerset { }) } + /// Returns the number of peers that we have discovered. + pub fn num_discovered_peers(&self) -> usize { + self.data.peers().len() + } + /// Returns priority group by id. pub fn get_priority_group(&self, group_id: &str) -> Option> { self.data.get_priority_group(group_id) diff --git a/substrate/client/peerset/src/peersstate.rs b/substrate/client/peerset/src/peersstate.rs index 7abf17a5f8..843ec0a360 100644 --- a/substrate/client/peerset/src/peersstate.rs +++ b/substrate/client/peerset/src/peersstate.rs @@ -144,7 +144,7 @@ impl PeersState { /// Returns the list of all the peers we know of. // Note: this method could theoretically return a `Peer`, but implementing that // isn't simple. - pub fn peers(&self) -> impl Iterator { + pub fn peers(&self) -> impl ExactSizeIterator { self.nodes.keys() }