From a1586df41b110440c376410799ecf676c65a93de Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 25 Jan 2019 11:34:34 +0100 Subject: [PATCH] Report the average network bandwidth (#1551) * Report the average network bandwidth * Fix concerns --- substrate/Cargo.lock | 1 + substrate/core/cli/src/informant.rs | 34 ++- substrate/core/network-libp2p/Cargo.toml | 1 + .../core/network-libp2p/src/service_task.rs | 23 +- .../core/network-libp2p/src/transport.rs | 18 +- .../network-libp2p/src/transport/bandwidth.rs | 262 ++++++++++++++++++ substrate/core/network/src/service.rs | 12 + 7 files changed, 342 insertions(+), 9 deletions(-) create mode 100644 substrate/core/network-libp2p/src/transport/bandwidth.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 99ebbf44cc..a108af11f7 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3833,6 +3833,7 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index deda6ab7b6..61fa618e92 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -17,7 +17,7 @@ //! Console informant. Prints sync progress and block events. Runs on the calling thread. use ansi_term::Colour; -use std::time::{Duration, Instant}; +use std::{fmt, time::{Duration, Instant}}; use futures::{Future, Stream}; use service::{Service, Components}; use tokio::runtime::TaskExecutor; @@ -64,9 +64,11 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe last_number = Some(best_number); let txpool_status = txpool.status(); let finalized_number: u64 = info.chain.finalized_number.as_(); + let bandwidth_download = network.average_download_per_sec(); + let bandwidth_upload = network.average_upload_per_sec(); info!( target: "substrate", - "{}{} ({} peers), best: #{} ({}), finalized #{} ({})", + "{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⭳ {} ⭱ {}", Colour::White.bold().paint(&status), target, Colour::White.bold().paint(format!("{}", sync_status.num_peers)), @@ -74,6 +76,8 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe best_hash, Colour::White.paint(format!("{}", finalized_number)), info.chain.finalized_hash, + TransferRateFormat(bandwidth_download), + TransferRateFormat(bandwidth_upload), ); // get cpu usage and memory usage of this process @@ -93,6 +97,8 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe "memory" => memory, "finalized_height" => finalized_number, "finalized_hash" => ?info.chain.finalized_hash, + "bandwidth_download" => bandwidth_download, + "bandwidth_upload" => bandwidth_upload, ); } else { warn!("Error getting best block information"); @@ -159,3 +165,27 @@ fn speed(best_number: u64, last_number: Option) -> String { format!(" {:4.1} bps", speed / 10.0) } } + +/// Contains a number of bytes per second. Implements `fmt::Display` and shows this number of bytes +/// per second in a nice way. +struct TransferRateFormat(u64); +impl fmt::Display for TransferRateFormat { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Special case 0. + if self.0 == 0 { + return write!(f, "0") + } + + // Under 0.1 kiB, display plain bytes. + if self.0 < 100 { + return write!(f, "{} B/s", self.0) + } + + // Under 1.0 MiB/sec, display the value in kiB/sec. + if self.0 < 1024 * 1024 { + return write!(f, "{:.1}kiB/s", self.0 as f64 / 1024.0) + } + + write!(f, "{:.1}MiB/s", self.0 as f64 / (1024.0 * 1024.0)) + } +} diff --git a/substrate/core/network-libp2p/Cargo.toml b/substrate/core/network-libp2p/Cargo.toml index 4b2ad770e0..d5fcb72d11 100644 --- a/substrate/core/network-libp2p/Cargo.toml +++ b/substrate/core/network-libp2p/Cargo.toml @@ -14,6 +14,7 @@ fnv = "1.0" futures = "0.1" libp2p = { version = "0.2", default-features = false, features = ["secio-rsa", "secio-secp256k1", "libp2p-websocket"] } parking_lot = "0.7.1" +lazy_static = "1.2" log = "0.4" rand = "0.5.0" serde = "1.0.70" diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 390bb86ebc..dea554e89e 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -30,6 +30,7 @@ use std::fs; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; use std::time::Duration; use tokio_timer::Interval; @@ -68,11 +69,11 @@ where TProtos: IntoIterator { topology.add_external_addrs(config.public_addresses.clone().into_iter()); // Build the swarm. - let mut swarm = { + let (mut swarm, bandwidth) = { let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect()); let behaviour = Behaviour::new(&config, local_peer_id.clone(), registered_custom); - let transport = transport::build_transport(local_private_key); - Swarm::new(transport, behaviour, topology) + let (transport, bandwidth) = transport::build_transport(local_private_key); + (Swarm::new(transport, behaviour, topology), bandwidth) }; // Listen on multiaddresses. @@ -127,6 +128,7 @@ where TProtos: IntoIterator { Ok(Service { swarm, + bandwidth, nodes_info: Default::default(), index_by_id: Default::default(), next_node_id: 1, @@ -192,6 +194,9 @@ pub struct Service { /// Stream of events of the swarm. swarm: Swarm, Behaviour>, NetTopology>, + /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. + bandwidth: Arc, + /// Information about all the nodes we're connected to. nodes_info: FnvHashMap, @@ -227,6 +232,18 @@ impl Service { Swarm::listeners(&self.swarm) } + /// Returns the downloaded bytes per second averaged over the past few seconds. + #[inline] + pub fn average_download_per_sec(&self) -> u64 { + self.bandwidth.average_download_per_sec() + } + + /// Returns the uploaded bytes per second averaged over the past few seconds. + #[inline] + pub fn average_upload_per_sec(&self) -> u64 { + self.bandwidth.average_upload_per_sec() + } + /// Returns the peer id of the local node. #[inline] pub fn peer_id(&self) -> &PeerId { diff --git a/substrate/core/network-libp2p/src/transport.rs b/substrate/core/network-libp2p/src/transport.rs index b7fd915e50..66fd40584c 100644 --- a/substrate/core/network-libp2p/src/transport.rs +++ b/substrate/core/network-libp2p/src/transport.rs @@ -17,12 +17,19 @@ use futures::prelude::*; use libp2p::{InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, mplex, secio, yamux, tcp, dns, websocket}; use libp2p::core::{self, transport::boxed::Boxed, muxing::StreamMuxerBox}; -use std::{io, time::Duration, usize}; +use std::{io, sync::Arc, time::Duration, usize}; + +pub use self::bandwidth::BandwidthSinks; + +mod bandwidth; /// Builds the transport that serves as a common ground for all connections. +/// +/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all +/// the connections spawned with this transport. pub fn build_transport( local_private_key: secio::SecioKeyPair -) -> Boxed<(PeerId, StreamMuxerBox), io::Error> { +) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { let mut mplex_config = mplex::MplexConfig::new(); mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block); mplex_config.max_buffer_len(usize::MAX); @@ -30,9 +37,10 @@ pub fn build_transport( let transport = tcp::TcpConfig::new(); let transport = websocket::WsConfig::new(transport.clone()).or_transport(transport); let transport = dns::DnsConfig::new(transport); + let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, 5); // TODO: rework the transport creation (https://github.com/libp2p/rust-libp2p/issues/783) - transport + let transport = transport .with_upgrade(secio::SecioConfig::new(local_private_key)) .and_then(move |out, endpoint| { let peer_id = out.remote_key.into_peer_id(); @@ -46,5 +54,7 @@ pub fn build_transport( }) .with_timeout(Duration::from_secs(20)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .boxed() + .boxed(); + + (transport, sinks) } diff --git a/substrate/core/network-libp2p/src/transport/bandwidth.rs b/substrate/core/network-libp2p/src/transport/bandwidth.rs new file mode 100644 index 0000000000..23ec064918 --- /dev/null +++ b/substrate/core/network-libp2p/src/transport/bandwidth.rs @@ -0,0 +1,262 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use futures::{prelude::*, try_ready}; +use lazy_static::lazy_static; +use libp2p::{Multiaddr, core::Transport, core::transport::TransportError}; +use parking_lot::Mutex; +use smallvec::{smallvec, SmallVec}; +use std::{io, io::Read, io::Write, sync::Arc, time::Instant}; + +/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections. +#[derive(Clone)] +pub struct BandwidthLogging { + inner: TInner, + sinks: Arc, +} + +impl BandwidthLogging { + /// Creates a new `BandwidthLogging` around the transport. + #[inline] + pub fn new(inner: TInner, period_seconds: u32) -> (Self, Arc) { + let sink = Arc::new(BandwidthSinks { + download: Mutex::new(BandwidthSink::new(period_seconds)), + upload: Mutex::new(BandwidthSink::new(period_seconds)), + }); + + let trans = BandwidthLogging { + inner, + sinks: sink.clone(), + }; + + (trans, sink) + } +} + +impl Transport for BandwidthLogging +where + TInner: Transport, +{ + type Output = BandwidthConnecLogging; + type Error = TInner::Error; + type Listener = BandwidthListener; + type ListenerUpgrade = BandwidthFuture; + type Dial = BandwidthFuture; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError> { + let sinks = self.sinks; + self.inner + .listen_on(addr) + .map(|(inner, new_addr)| (BandwidthListener { inner, sinks }, new_addr)) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let sinks = self.sinks; + self.inner + .dial(addr) + .map(move |fut| BandwidthFuture { + inner: fut, + sinks, + }) + } + + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} + +/// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth +/// counter. +pub struct BandwidthListener { + inner: TInner, + sinks: Arc, +} + +impl Stream for BandwidthListener +where TInner: Stream, +{ + type Item = (BandwidthFuture, Multiaddr); + type Error = TInner::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let (inner, addr) = match try_ready!(self.inner.poll()) { + Some(v) => v, + None => return Ok(Async::Ready(None)) + }; + + let fut = BandwidthFuture { + inner, + sinks: self.sinks.clone(), + }; + + Ok(Async::Ready(Some((fut, addr)))) + } +} + +/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth +/// counter. +pub struct BandwidthFuture { + inner: TInner, + sinks: Arc, +} + +impl Future for BandwidthFuture + where TInner: Future, +{ + type Item = BandwidthConnecLogging; + type Error = TInner::Error; + + fn poll(&mut self) -> Poll { + let inner = try_ready!(self.inner.poll()); + Ok(Async::Ready(BandwidthConnecLogging { + inner, + sinks: self.sinks.clone(), + })) + } +} + +/// Allows obtaining the average bandwidth of the connections created from a `BandwidthLogging`. +pub struct BandwidthSinks { + download: Mutex, + upload: Mutex, +} + +impl BandwidthSinks { + /// Returns the average number of bytes that have been downloaded in the period. + #[inline] + pub fn average_download_per_sec(&self) -> u64 { + self.download.lock().get() + } + + /// Returns the average number of bytes that have been uploaded in the period. + #[inline] + pub fn average_upload_per_sec(&self) -> u64 { + self.upload.lock().get() + } +} + +/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. +pub struct BandwidthConnecLogging { + inner: TInner, + sinks: Arc, +} + +impl Read for BandwidthConnecLogging + where TInner: Read +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let num_bytes = self.inner.read(buf)?; + self.sinks.download.lock().inject(num_bytes); + Ok(num_bytes) + } +} + +impl tokio_io::AsyncRead for BandwidthConnecLogging + where TInner: tokio_io::AsyncRead +{ +} + +impl Write for BandwidthConnecLogging + where TInner: Write +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + let num_bytes = self.inner.write(buf)?; + self.sinks.upload.lock().inject(num_bytes); + Ok(num_bytes) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl tokio_io::AsyncWrite for BandwidthConnecLogging + where TInner: tokio_io::AsyncWrite +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.shutdown() + } +} + +/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now. +#[inline] +fn current_second() -> u32 { + lazy_static! { + static ref EPOCH: Instant = Instant::now(); + } + + EPOCH.elapsed().as_secs() as u32 +} + +/// Structure that calculates the average bandwidth over the last few seconds. +/// +/// If you want to calculate for example both download and upload bandwidths, create two different +/// objects. +struct BandwidthSink { + /// Bytes sent over the past seconds. Contains `rolling_seconds + 1` elements. Only the first + /// `rolling_seconds` elements are taken into account for the average, while the last element + /// is the element to be inserted later. + bytes: SmallVec<[u64; 8]>, + /// Number of seconds between `EPOCH` and the moment we have last updated `bytes`. + latest_update: u32, + /// Number of seconds. Configured at initialization and never modified. + rolling_seconds: u32, +} + +impl BandwidthSink { + /// Initializes a `BandwidthSink`. + fn new(seconds: u32) -> Self { + BandwidthSink { + bytes: smallvec![0; seconds as usize + 1], + latest_update: current_second(), + rolling_seconds: seconds, + } + } + + /// Returns the number of bytes over the last few seconds. The number of seconds is the value + /// configured at initialization. + fn get(&mut self) -> u64 { + self.update(); + self.bytes.iter() + .take(self.rolling_seconds.saturating_sub(1) as usize) + .fold(0u64, |a, &b| a.saturating_add(b)) / u64::from(self.rolling_seconds) + } + + /// Notifies the `BandwidthSink` that a certain number of bytes have been transmitted at this + /// moment. + fn inject(&mut self, bytes: usize) { + self.update(); + if let Some(last) = self.bytes.last_mut() { + *last = last.saturating_add(bytes as u64); + } + } + + /// Updates the state of the `BandwidthSink` so that the last element of `bytes` contains the + /// current second. + fn update(&mut self) { + let current_second = current_second(); + for _ in self.latest_update .. current_second { + self.bytes.remove(0); + self.bytes.push(0); + } + + self.latest_update = current_second; + } +} diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 952fb4cc8f..91840063b2 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -173,6 +173,18 @@ impl, H: ExHashT> Service u64 { + self.network.lock().average_download_per_sec() + } + + /// Returns the uploaded bytes per second averaged over the past few seconds. + #[inline] + pub fn average_upload_per_sec(&self) -> u64 { + self.network.lock().average_upload_per_sec() + } + /// Called when a new block is imported by the client. pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) { self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header)