Update to libp2p-0.23. (#6870)

* Update to libp2p-0.23.

Thereby incorporate bandwidth measurement along the
lines previously done by libp2p itself.

* Tweak dependencies for wasm32 compilation.

For wasm32 we need to enable unstable features to
make `task::Builder::local` available.

* Simplify dependencies.

* Simplify.

Leave the calculation of bytes sent/received per second
to the outer layers of the code, subject to their own
individual update intervals.

* Cleanup

* Re-add lost dev dependency.

* Avoid division by zero.

* Remove redundant metric.

* Enable sending of noise legacy handshakes.

* Add comment about monotonic gauge.

* CI
This commit is contained in:
Roman Borschel
2020-08-14 10:41:47 +02:00
committed by GitHub
parent f94aae1d2d
commit 327e02942c
18 changed files with 129 additions and 75 deletions
+11 -4
View File
@@ -300,6 +300,12 @@ dependencies = [
"webpki-roots 0.19.0",
]
[[package]]
name = "atomic"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64f46ca51dca4837f1520754d1c8c36636356b81553d928dc9c177025369a06e"
[[package]]
name = "atty"
version = "0.2.14"
@@ -2797,10 +2803,11 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]]
name = "libp2p"
version = "0.22.0"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0306a49ee6a89468f96089906f36b0eef82c988dcfc8acf3e2dcd6ad1c859f85"
checksum = "b1ebb6c031584a5af181fe3a1e4b074af5d0b1a3b31663200f0251f4bcff6b5c"
dependencies = [
"atomic",
"bytes 0.5.4",
"futures 0.3.5",
"lazy_static",
@@ -2965,9 +2972,9 @@ dependencies = [
[[package]]
name = "libp2p-noise"
version = "0.21.0"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f353f8966bbaaf7456535fffd3f366f153148773a0cf04b2ec3860955cb720e"
checksum = "1e594f2de0c23c2b7ad14802c991a2e68e95315c6a6c7715e53801506f20135d"
dependencies = [
"bytes 0.5.4",
"curve25519-dalek",
@@ -8,7 +8,7 @@ license = "Apache-2.0"
[dependencies]
futures-timer = "3.0.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
jsonrpc-core = "14.2.0"
serde = "1.0.106"
serde_json = "1.0.48"
+1 -1
View File
@@ -33,7 +33,7 @@ derive_more = { version = "0.99.2" }
sc-rpc = { version = "2.0.0-rc5", path = "../../../client/rpc" }
jsonrpc-core-client = { version = "14.2.0", features = ["http"] }
hyper = "0.12.35"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
serde_json = "1.0"
[features]
@@ -21,7 +21,7 @@ codec = { package = "parity-scale-codec", default-features = false, version = "1
derive_more = "0.99.2"
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.22.0", default-features = false, features = ["kad"] }
libp2p = { version = "0.23.0", default-features = false, features = ["kad"] }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc5"}
prost = "0.6.1"
+27 -5
View File
@@ -45,6 +45,10 @@ pub struct InformantDisplay<B: BlockT> {
last_number: Option<NumberFor<B>>,
/// The last time `display` or `new` has been called.
last_update: Instant,
/// The last seen total of bytes received.
last_total_bytes_inbound: u64,
/// The last seen total of bytes sent.
last_total_bytes_outbound: u64,
/// The format to print output in.
format: OutputFormat,
}
@@ -55,6 +59,8 @@ impl<B: BlockT> InformantDisplay<B> {
InformantDisplay {
last_number: None,
last_update: Instant::now(),
last_total_bytes_inbound: 0,
last_total_bytes_outbound: 0,
format,
}
}
@@ -66,9 +72,25 @@ impl<B: BlockT> InformantDisplay<B> {
let finalized_number = info.chain.finalized_number;
let num_connected_peers = net_status.num_connected_peers;
let speed = speed::<B>(best_number, self.last_number, self.last_update);
self.last_update = Instant::now();
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
let now = Instant::now();
let elapsed = (now - self.last_update).as_secs();
self.last_update = now;
self.last_number = Some(best_number);
let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound;
let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound;
let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) =
if elapsed > 0 {
self.last_total_bytes_inbound = total_bytes_inbound;
self.last_total_bytes_outbound = total_bytes_outbound;
(diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed)
} else {
(diff_bytes_inbound, diff_bytes_outbound)
};
let (level, status, target) = match (net_status.sync_state, net_status.best_seen_block) {
(SyncState::Idle, _) => ("💤", "Idle".into(), "".into()),
(SyncState::Downloading, None) => ("⚙️ ", format!("Preparing{}", speed), "".into()),
@@ -92,8 +114,8 @@ impl<B: BlockT> InformantDisplay<B> {
best_hash,
Colour::White.bold().paint(format!("{}", finalized_number)),
info.chain.finalized_hash,
Colour::Green.paint(format!("{}", TransferRateFormat(net_status.average_download_per_sec))),
Colour::Red.paint(format!("{}", TransferRateFormat(net_status.average_upload_per_sec))),
Colour::Green.paint(format!("{}", TransferRateFormat(avg_bytes_per_sec_inbound))),
Colour::Red.paint(format!("{}", TransferRateFormat(avg_bytes_per_sec_outbound))),
)
} else {
info!(
@@ -108,8 +130,8 @@ impl<B: BlockT> InformantDisplay<B> {
best_hash,
finalized_number,
info.chain.finalized_hash,
TransferRateFormat(net_status.average_download_per_sec),
TransferRateFormat(net_status.average_upload_per_sec),
TransferRateFormat(avg_bytes_per_sec_inbound),
TransferRateFormat(avg_bytes_per_sec_outbound),
)
}
}
+1 -1
View File
@@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
log = "0.4.8"
lru = "0.4.3"
sc-network = { version = "0.8.0-rc5", path = "../network" }
+3 -3
View File
@@ -26,7 +26,7 @@ erased-serde = "0.3.9"
fnv = "1.0.6"
fork-tree = { version = "2.0.0-rc5", path = "../../utils/fork-tree" }
futures = "0.3.4"
futures-timer = "3.0.1"
futures-timer = "3.0.2"
futures_codec = "0.4.0"
hex = "0.4.0"
ip_network = "0.3.4"
@@ -61,7 +61,7 @@ wasm-timer = "0.2"
zeroize = "1.0.0"
[dependencies.libp2p]
version = "0.22.0"
version = "0.23.0"
default-features = false
features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"]
@@ -69,7 +69,7 @@ features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std"
async-std = "1.6.2"
assert_matches = "1.3"
env_logger = "0.7.0"
libp2p = { version = "0.22.0", default-features = false, features = ["secio"] }
libp2p = { version = "0.23.0", default-features = false, features = ["secio"] }
quickcheck = "0.9.0"
rand = "0.7.2"
sp-keyring = { version = "2.0.0-rc5", path = "../../primitives/keyring" }
+4 -4
View File
@@ -309,8 +309,8 @@ pub struct NetworkStatus<B: BlockT> {
pub num_connected_peers: usize,
/// Total number of active peers.
pub num_active_peers: usize,
/// Downloaded bytes per second averaged over the past few seconds.
pub average_download_per_sec: u64,
/// Uploaded bytes per second averaged over the past few seconds.
pub average_upload_per_sec: u64,
/// The total number of bytes received.
pub total_bytes_inbound: u64,
/// The total number of bytes sent.
pub total_bytes_outbound: u64,
}
@@ -43,10 +43,10 @@ pub struct NetworkState {
pub connected_peers: HashMap<String, Peer>,
/// List of node that we know of but that we're not connected to.
pub not_connected_peers: HashMap<String, NotConnectedPeer>,
/// Downloaded bytes per second averaged over the past few seconds.
pub average_download_per_sec: u64,
/// Uploaded bytes per second averaged over the past few seconds.
pub average_upload_per_sec: u64,
/// The total number of bytes received.
pub total_bytes_inbound: u64,
/// The total number of bytes sent.
pub total_bytes_outbound: u64,
/// State of the peerset manager.
pub peerset: serde_json::Value,
}
+16 -14
View File
@@ -402,14 +402,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
})
}
/// Returns the downloaded bytes per second averaged over the past few seconds.
pub fn average_download_per_sec(&self) -> u64 {
self.service.bandwidth.average_download_per_sec()
/// Returns the total number of bytes received so far.
pub fn total_bytes_inbound(&self) -> u64 {
self.service.bandwidth.total_inbound()
}
/// Returns the uploaded bytes per second averaged over the past few seconds.
pub fn average_upload_per_sec(&self) -> u64 {
self.service.bandwidth.average_upload_per_sec()
/// Returns the total number of bytes sent so far.
pub fn total_bytes_outbound(&self) -> u64 {
self.service.bandwidth.total_outbound()
}
/// Returns the number of peers we're connected to.
@@ -541,8 +541,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
average_download_per_sec: self.service.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(),
total_bytes_inbound: self.service.bandwidth.total_inbound(),
total_bytes_outbound: self.service.bandwidth.total_outbound(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
@@ -1148,7 +1148,9 @@ struct Metrics {
kbuckets_num_nodes: GaugeVec<U64>,
listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>,
network_per_sec_bytes: GaugeVec<U64>,
// Note: `network_bytes_total` is a monotonic gauge obtained by
// sampling an existing counter.
network_bytes_total: GaugeVec<U64>,
notifications_sizes: HistogramVec,
notifications_streams_closed_total: CounterVec<U64>,
notifications_streams_opened_total: CounterVec<U64>,
@@ -1265,10 +1267,10 @@ impl Metrics {
"sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
network_per_sec_bytes: register(GaugeVec::new(
network_bytes_total: register(GaugeVec::new(
Opts::new(
"sub_libp2p_network_per_sec_bytes",
"Average bandwidth usage per second"
"sub_libp2p_network_bytes_total",
"Total bandwidth usage"
),
&["direction"]
)?, registry)?,
@@ -1719,8 +1721,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec());
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
metrics.network_bytes_total.with_label_values(&["in"]).set(this.service.bandwidth.total_inbound());
metrics.network_bytes_total.with_label_values(&["out"]).set(this.service.bandwidth.total_outbound());
metrics.is_major_syncing.set(is_major_syncing as u64);
for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
+15 -8
View File
@@ -27,7 +27,7 @@ use libp2p::{
};
#[cfg(not(target_os = "unknown"))]
use libp2p::{tcp, dns, websocket};
use std::{io, sync::Arc, time::Duration, usize};
use std::{io, sync::Arc, time::Duration};
pub use self::bandwidth::BandwidthSinks;
@@ -43,7 +43,11 @@ pub fn build_transport(
memory_only: bool,
wasm_external_transport: Option<wasm_ext::ExtTransport>,
use_yamux_flow_control: bool
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<BandwidthSinks>) {
// Legacy noise configurations for backward compatibility.
let mut noise_legacy = noise::LegacyConfig::default();
noise_legacy.send_legacy_handshake = true;
// Build configuration objects for encryption mechanisms.
let noise_config = {
// For more information about these two panics, see in "On the Importance of
@@ -58,10 +62,12 @@ pub fn build_transport(
once and at initialization, we're taking the bet that the inconvenience of a very \
rare panic here is basically zero");
core::upgrade::SelectUpgrade::new(
noise::NoiseConfig::xx(noise_keypair_spec),
noise::NoiseConfig::ix(noise_keypair_legacy)
)
let mut xx_config = noise::NoiseConfig::xx(noise_keypair_spec);
xx_config.set_legacy_config(noise_legacy.clone());
let mut ix_config = noise::NoiseConfig::ix(noise_keypair_legacy);
ix_config.set_legacy_config(noise_legacy);
core::upgrade::SelectUpgrade::new(xx_config, ix_config)
};
// Build configuration objects for multiplexing mechanisms.
@@ -104,7 +110,7 @@ pub fn build_transport(
OptionalTransport::none()
});
let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5));
let (transport, bandwidth) = bandwidth::BandwidthLogging::new(transport);
// Encryption
let transport = transport.and_then(move |stream, endpoint| {
@@ -145,5 +151,6 @@ pub fn build_transport(
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
(transport, sinks)
(transport, bandwidth)
}
+1 -1
View File
@@ -19,7 +19,7 @@ parking_lot = "0.10.0"
futures = "0.3.4"
futures-timer = "3.0.1"
rand = "0.7.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
sp-consensus = { version = "0.8.0-rc5", path = "../../../primitives/consensus/common" }
sc-consensus = { version = "0.8.0-rc5", path = "../../../client/consensus/common" }
sc-client-api = { version = "2.0.0-rc5", path = "../../api" }
+1 -1
View File
@@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.4"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
sp-utils = { version = "2.0.0-rc5", path = "../../primitives/utils"}
log = "0.4.8"
serde_json = "1.0.41"
+4 -4
View File
@@ -87,8 +87,8 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
external_addresses: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
total_bytes_inbound: 0,
total_bytes_outbound: 0,
peerset: serde_json::Value::Null,
}).unwrap());
},
@@ -282,8 +282,8 @@ fn system_network_state() {
external_addresses: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
total_bytes_inbound: 0,
total_bytes_outbound: 0,
peerset: serde_json::Value::Null,
}
);
+2 -2
View File
@@ -322,8 +322,8 @@ async fn build_network_future<
num_sync_peers: network.num_sync_peers(),
num_connected_peers: network.num_connected_peers(),
num_active_peers: network.num_active_peers(),
average_download_per_sec: network.average_download_per_sec(),
average_upload_per_sec: network.average_upload_per_sec(),
total_bytes_inbound: network.total_bytes_inbound(),
total_bytes_outbound: network.total_bytes_outbound(),
};
let state = network.network_state();
ready_sink.send((status, state));
+35 -19
View File
@@ -26,6 +26,7 @@ use sp_transaction_pool::PoolStatus;
use sp_utils::metrics::register_globals;
use sc_client_api::ClientInfo;
use sc_network::config::Role;
use wasm_timer::Instant;
struct PrometheusMetrics {
// generic info
@@ -34,7 +35,6 @@ struct PrometheusMetrics {
ready_transactions_number: Gauge<U64>,
// I/O
network_per_sec_bytes: GaugeVec<U64>,
database_cache: Gauge<U64>,
state_cache: Gauge<U64>,
state_db: GaugeVec<U64>,
@@ -85,10 +85,6 @@ impl PrometheusMetrics {
)?, registry)?,
// I/ O
network_per_sec_bytes: register(GaugeVec::new(
Opts::new("network_per_sec_bytes", "Networking bytes per second"),
&["direction"]
)?, registry)?,
database_cache: register(Gauge::new(
"database_cache_bytes", "RocksDB cache size in bytes",
)?, registry)?,
@@ -105,11 +101,19 @@ impl PrometheusMetrics {
pub struct MetricsService {
metrics: Option<PrometheusMetrics>,
last_update: Instant,
last_total_bytes_inbound: u64,
last_total_bytes_outbound: u64,
}
impl MetricsService {
pub fn new() -> Self {
MetricsService { metrics: None }
MetricsService {
metrics: None,
last_total_bytes_inbound: 0,
last_total_bytes_outbound: 0,
last_update: Instant::now(),
}
}
pub fn with_prometheus(
@@ -129,7 +133,12 @@ impl MetricsService {
&config.impl_version,
role_bits,
)
.map(|p| MetricsService { metrics: Some(p) })
.map(|p| MetricsService {
metrics: Some(p),
last_total_bytes_inbound: 0,
last_total_bytes_outbound: 0,
last_update: Instant::now(),
})
}
pub fn tick<T: Block>(
@@ -138,16 +147,31 @@ impl MetricsService {
txpool_status: &PoolStatus,
net_status: &NetworkStatus<T>,
) {
let now = Instant::now();
let elapsed = (now - self.last_update).as_secs();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let bandwidth_download = net_status.average_download_per_sec;
let bandwidth_upload = net_status.average_upload_per_sec;
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
let best_seen_block = net_status
.best_seen_block
.map(|num: NumberFor<T>| num.unique_saturated_into() as u64);
let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound;
let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound;
let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) =
if elapsed > 0 {
self.last_total_bytes_inbound = total_bytes_inbound;
self.last_total_bytes_outbound = total_bytes_outbound;
(diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed)
} else {
(diff_bytes_inbound, diff_bytes_outbound)
};
self.last_update = now;
telemetry!(
SUBSTRATE_INFO;
"system.interval";
@@ -157,8 +181,8 @@ impl MetricsService {
"txcount" => txpool_status.ready,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"bandwidth_download" => avg_bytes_per_sec_inbound,
"bandwidth_upload" => avg_bytes_per_sec_outbound,
"used_state_cache_size" => info.usage.as_ref()
.map(|usage| usage.memory.state_cache.as_bytes())
.unwrap_or(0),
@@ -174,14 +198,6 @@ impl MetricsService {
);
if let Some(metrics) = self.metrics.as_ref() {
metrics
.network_per_sec_bytes
.with_label_values(&["download"])
.set(net_status.average_download_per_sec);
metrics
.network_per_sec_bytes
.with_label_values(&["upload"])
.set(net_status.average_upload_per_sec);
metrics
.block_height
.with_label_values(&["finalized"])
+1 -1
View File
@@ -18,7 +18,7 @@ parking_lot = "0.10.0"
futures = "0.3.4"
futures-timer = "3.0.1"
wasm-timer = "0.2.0"
libp2p = { version = "0.22.0", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] }
libp2p = { version = "0.23.0", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] }
log = "0.4.8"
pin-project = "0.4.6"
rand = "0.7.2"
@@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
derive_more = "0.99.2"
libp2p = { version = "0.22.0", default-features = false }
libp2p = { version = "0.23.0", default-features = false }
log = "0.4.8"
sp-core = { path= "../../core", version = "2.0.0-rc5"}
sp-inherents = { version = "2.0.0-rc5", path = "../../inherents" }