|
|
|
@@ -25,7 +25,7 @@
|
|
|
|
|
//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
|
|
|
|
|
//! which is then processed by [`NetworkWorker::poll`].
|
|
|
|
|
|
|
|
|
|
use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path};
|
|
|
|
|
use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path, str};
|
|
|
|
|
use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}};
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::task::Poll;
|
|
|
|
@@ -39,7 +39,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
|
|
|
|
|
use parking_lot::Mutex;
|
|
|
|
|
use sc_peerset::PeersetHandle;
|
|
|
|
|
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
|
|
|
|
use prometheus_endpoint::{Registry, Gauge, U64, register, PrometheusError};
|
|
|
|
|
use prometheus_endpoint::{Registry, Counter, CounterVec, Gauge, GaugeVec, Opts, U64, register, PrometheusError};
|
|
|
|
|
|
|
|
|
|
use crate::{behaviour::{Behaviour, BehaviourOut}, config::{parse_str_addr, parse_addr}};
|
|
|
|
|
use crate::{transport, config::NonReservedPeerMode, ReputationChange};
|
|
|
|
@@ -734,25 +734,108 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
|
|
|
|
|
/// Senders for events that happen on the network.
|
|
|
|
|
event_streams: Vec<mpsc::UnboundedSender<Event>>,
|
|
|
|
|
/// Prometheus network metrics.
|
|
|
|
|
metrics: Option<Metrics>
|
|
|
|
|
metrics: Option<Metrics>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct Metrics {
|
|
|
|
|
// This list is ordered alphabetically
|
|
|
|
|
connections: Gauge<U64>,
|
|
|
|
|
import_queue_blocks_submitted: Counter<U64>,
|
|
|
|
|
import_queue_finality_proofs_submitted: Counter<U64>,
|
|
|
|
|
import_queue_justifications_submitted: Counter<U64>,
|
|
|
|
|
is_major_syncing: Gauge<U64>,
|
|
|
|
|
kbuckets_num_nodes: Gauge<U64>,
|
|
|
|
|
network_per_sec_bytes: GaugeVec<U64>,
|
|
|
|
|
notifications_total: CounterVec<U64>,
|
|
|
|
|
num_event_stream_channels: Gauge<U64>,
|
|
|
|
|
opened_notification_streams: GaugeVec<U64>,
|
|
|
|
|
peers_count: Gauge<U64>,
|
|
|
|
|
peerset_num_discovered: Gauge<U64>,
|
|
|
|
|
peerset_num_requested: Gauge<U64>,
|
|
|
|
|
random_kademalia_queries_total: Counter<U64>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Metrics {
|
|
|
|
|
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
|
|
|
|
|
Ok(Self {
|
|
|
|
|
// This list is ordered alphabetically
|
|
|
|
|
connections: register(Gauge::new(
|
|
|
|
|
"sub_libp2p_connections", "Number of libp2p connections"
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
import_queue_blocks_submitted: register(Counter::new(
|
|
|
|
|
"import_queue_blocks_submitted",
|
|
|
|
|
"Number of blocks submitted to the import queue.",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
import_queue_finality_proofs_submitted: register(Counter::new(
|
|
|
|
|
"import_queue_finality_proofs_submitted",
|
|
|
|
|
"Number of finality proofs submitted to the import queue.",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
import_queue_justifications_submitted: register(Counter::new(
|
|
|
|
|
"import_queue_justifications_submitted",
|
|
|
|
|
"Number of justifications submitted to the import queue.",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
is_major_syncing: register(Gauge::new(
|
|
|
|
|
"is_major_syncing", "Whether the node is performing a major sync or not.",
|
|
|
|
|
"sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
kbuckets_num_nodes: register(Gauge::new(
|
|
|
|
|
"sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets"
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
network_per_sec_bytes: register(GaugeVec::new(
|
|
|
|
|
Opts::new(
|
|
|
|
|
"sub_libp2p_network_per_sec_bytes",
|
|
|
|
|
"Average bandwidth usage per second"
|
|
|
|
|
),
|
|
|
|
|
&["direction"]
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
notifications_total: register(CounterVec::new(
|
|
|
|
|
Opts::new(
|
|
|
|
|
"sub_libp2p_notifications_total",
|
|
|
|
|
"Number of notification received from all nodes"
|
|
|
|
|
),
|
|
|
|
|
&["direction", "protocol"]
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
num_event_stream_channels: register(Gauge::new(
|
|
|
|
|
"sub_libp2p_num_event_stream_channels",
|
|
|
|
|
"Number of internal active channels that broadcast network events",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
opened_notification_streams: register(GaugeVec::new(
|
|
|
|
|
Opts::new(
|
|
|
|
|
"sub_libp2p_opened_notification_streams",
|
|
|
|
|
"Number of open notification substreams"
|
|
|
|
|
),
|
|
|
|
|
&["protocol"]
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
peers_count: register(Gauge::new(
|
|
|
|
|
"peers_count", "Number of network gossip peers",
|
|
|
|
|
"sub_libp2p_peers_count", "Number of network gossip peers",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
peerset_num_discovered: register(Gauge::new(
|
|
|
|
|
"sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
peerset_num_requested: register(Gauge::new(
|
|
|
|
|
"sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
random_kademalia_queries_total: register(Counter::new(
|
|
|
|
|
"sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started",
|
|
|
|
|
)?, registry)?,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn update_with_network_event(&self, event: &Event) {
|
|
|
|
|
match event {
|
|
|
|
|
Event::NotificationStreamOpened { engine_id, .. } => {
|
|
|
|
|
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
|
|
|
|
|
},
|
|
|
|
|
Event::NotificationStreamClosed { engine_id, .. } => {
|
|
|
|
|
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).dec();
|
|
|
|
|
},
|
|
|
|
|
Event::NotificationsReceived { messages, .. } => {
|
|
|
|
|
for (engine_id, _) in messages {
|
|
|
|
|
self.notifications_total.with_label_values(&["in", &engine_id_to_string(&engine_id)]).inc();
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
_ => {}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
|
|
|
@@ -800,10 +883,15 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
|
|
|
|
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
|
|
|
|
|
ServiceToWorkerMsg::EventStream(sender) =>
|
|
|
|
|
this.event_streams.push(sender),
|
|
|
|
|
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } =>
|
|
|
|
|
this.network_service.user_protocol_mut().write_notification(target, engine_id, message),
|
|
|
|
|
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => {
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.notifications_total.with_label_values(&["out", &engine_id_to_string(&engine_id)]).inc();
|
|
|
|
|
}
|
|
|
|
|
this.network_service.user_protocol_mut().write_notification(target, engine_id, message)
|
|
|
|
|
},
|
|
|
|
|
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
|
|
|
|
|
let events = this.network_service.user_protocol_mut().register_notifications_protocol(engine_id, protocol_name);
|
|
|
|
|
let events = this.network_service.user_protocol_mut()
|
|
|
|
|
.register_notifications_protocol(engine_id, protocol_name);
|
|
|
|
|
for event in events {
|
|
|
|
|
this.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok());
|
|
|
|
|
}
|
|
|
|
@@ -821,18 +909,47 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
|
|
|
|
|
|
|
|
|
match poll_value {
|
|
|
|
|
Poll::Pending => break,
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) =>
|
|
|
|
|
this.import_queue.import_blocks(origin, blocks),
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) =>
|
|
|
|
|
this.import_queue.import_justification(origin, hash, nb, justification),
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) =>
|
|
|
|
|
this.import_queue.import_finality_proof(origin, hash, nb, proof),
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) =>
|
|
|
|
|
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()),
|
|
|
|
|
Poll::Ready(SwarmEvent::Connected(peer_id)) =>
|
|
|
|
|
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id),
|
|
|
|
|
Poll::Ready(SwarmEvent::Disconnected(peer_id)) =>
|
|
|
|
|
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id),
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => {
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.import_queue_blocks_submitted.inc();
|
|
|
|
|
}
|
|
|
|
|
this.import_queue.import_blocks(origin, blocks);
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => {
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.import_queue_justifications_submitted.inc();
|
|
|
|
|
}
|
|
|
|
|
this.import_queue.import_justification(origin, hash, nb, justification);
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => {
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.import_queue_finality_proofs_submitted.inc();
|
|
|
|
|
}
|
|
|
|
|
this.import_queue.import_finality_proof(origin, hash, nb, proof);
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => {
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.random_kademalia_queries_total.inc();
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
|
|
|
|
|
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.update_with_network_event(&ev);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Connected(peer_id)) => {
|
|
|
|
|
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.connections.inc();
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::Disconnected(peer_id)) => {
|
|
|
|
|
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id);
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.connections.dec();
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Poll::Ready(SwarmEvent::NewListenAddr(addr)) =>
|
|
|
|
|
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr),
|
|
|
|
|
Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) =>
|
|
|
|
@@ -861,8 +978,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
|
|
|
|
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
|
|
|
|
|
|
|
|
|
|
if let Some(metrics) = this.metrics.as_ref() {
|
|
|
|
|
metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec());
|
|
|
|
|
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
|
|
|
|
|
metrics.is_major_syncing.set(is_major_syncing as u64);
|
|
|
|
|
metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64);
|
|
|
|
|
metrics.num_event_stream_channels.set(this.event_streams.len() as u64);
|
|
|
|
|
metrics.peers_count.set(num_connected_peers as u64);
|
|
|
|
|
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
|
|
|
|
|
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Poll::Pending
|
|
|
|
@@ -872,6 +995,15 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
|
|
|
|
|
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Turns a `ConsensusEngineId` into a representable string.
|
|
|
|
|
fn engine_id_to_string(id: &ConsensusEngineId) -> Cow<str> {
|
|
|
|
|
if let Ok(s) = std::str::from_utf8(&id[..]) {
|
|
|
|
|
Cow::Borrowed(s)
|
|
|
|
|
} else {
|
|
|
|
|
Cow::Owned(format!("{:?}", id))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// The libp2p swarm, customized for our needs.
|
|
|
|
|
type Swarm<B, H> = libp2p::swarm::Swarm<Behaviour<B, H>>;
|
|
|
|
|
|
|
|
|
|