mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Report the average network bandwidth (#1551)
* Report the average network bandwidth * Fix concerns
This commit is contained in:
Generated
+1
@@ -3833,6 +3833,7 @@ dependencies = [
|
||||
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libp2p 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! Console informant. Prints sync progress and block events. Runs on the calling thread.
|
||||
|
||||
use ansi_term::Colour;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{fmt, time::{Duration, Instant}};
|
||||
use futures::{Future, Stream};
|
||||
use service::{Service, Components};
|
||||
use tokio::runtime::TaskExecutor;
|
||||
@@ -64,9 +64,11 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
|
||||
last_number = Some(best_number);
|
||||
let txpool_status = txpool.status();
|
||||
let finalized_number: u64 = info.chain.finalized_number.as_();
|
||||
let bandwidth_download = network.average_download_per_sec();
|
||||
let bandwidth_upload = network.average_upload_per_sec();
|
||||
info!(
|
||||
target: "substrate",
|
||||
"{}{} ({} peers), best: #{} ({}), finalized #{} ({})",
|
||||
"{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⭳ {} ⭱ {}",
|
||||
Colour::White.bold().paint(&status),
|
||||
target,
|
||||
Colour::White.bold().paint(format!("{}", sync_status.num_peers)),
|
||||
@@ -74,6 +76,8 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
|
||||
best_hash,
|
||||
Colour::White.paint(format!("{}", finalized_number)),
|
||||
info.chain.finalized_hash,
|
||||
TransferRateFormat(bandwidth_download),
|
||||
TransferRateFormat(bandwidth_upload),
|
||||
);
|
||||
|
||||
// get cpu usage and memory usage of this process
|
||||
@@ -93,6 +97,8 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
|
||||
"memory" => memory,
|
||||
"finalized_height" => finalized_number,
|
||||
"finalized_hash" => ?info.chain.finalized_hash,
|
||||
"bandwidth_download" => bandwidth_download,
|
||||
"bandwidth_upload" => bandwidth_upload,
|
||||
);
|
||||
} else {
|
||||
warn!("Error getting best block information");
|
||||
@@ -159,3 +165,27 @@ fn speed(best_number: u64, last_number: Option<u64>) -> String {
|
||||
format!(" {:4.1} bps", speed / 10.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains a number of bytes per second. Implements `fmt::Display` and shows this number of bytes
|
||||
/// per second in a nice way.
|
||||
struct TransferRateFormat(u64);
|
||||
impl fmt::Display for TransferRateFormat {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
// Special case 0.
|
||||
if self.0 == 0 {
|
||||
return write!(f, "0")
|
||||
}
|
||||
|
||||
// Under 0.1 kiB, display plain bytes.
|
||||
if self.0 < 100 {
|
||||
return write!(f, "{} B/s", self.0)
|
||||
}
|
||||
|
||||
// Under 1.0 MiB/sec, display the value in kiB/sec.
|
||||
if self.0 < 1024 * 1024 {
|
||||
return write!(f, "{:.1}kiB/s", self.0 as f64 / 1024.0)
|
||||
}
|
||||
|
||||
write!(f, "{:.1}MiB/s", self.0 as f64 / (1024.0 * 1024.0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ fnv = "1.0"
|
||||
futures = "0.1"
|
||||
libp2p = { version = "0.2", default-features = false, features = ["secio-rsa", "secio-secp256k1", "libp2p-websocket"] }
|
||||
parking_lot = "0.7.1"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
rand = "0.5.0"
|
||||
serde = "1.0.70"
|
||||
|
||||
@@ -30,6 +30,7 @@ use std::fs;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio_timer::Interval;
|
||||
|
||||
@@ -68,11 +69,11 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
|
||||
topology.add_external_addrs(config.public_addresses.clone().into_iter());
|
||||
|
||||
// Build the swarm.
|
||||
let mut swarm = {
|
||||
let (mut swarm, bandwidth) = {
|
||||
let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect());
|
||||
let behaviour = Behaviour::new(&config, local_peer_id.clone(), registered_custom);
|
||||
let transport = transport::build_transport(local_private_key);
|
||||
Swarm::new(transport, behaviour, topology)
|
||||
let (transport, bandwidth) = transport::build_transport(local_private_key);
|
||||
(Swarm::new(transport, behaviour, topology), bandwidth)
|
||||
};
|
||||
|
||||
// Listen on multiaddresses.
|
||||
@@ -127,6 +128,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
|
||||
|
||||
Ok(Service {
|
||||
swarm,
|
||||
bandwidth,
|
||||
nodes_info: Default::default(),
|
||||
index_by_id: Default::default(),
|
||||
next_node_id: 1,
|
||||
@@ -192,6 +194,9 @@ pub struct Service {
|
||||
/// Stream of events of the swarm.
|
||||
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>, NetTopology>,
|
||||
|
||||
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
|
||||
bandwidth: Arc<transport::BandwidthSinks>,
|
||||
|
||||
/// Information about all the nodes we're connected to.
|
||||
nodes_info: FnvHashMap<NodeIndex, NodeInfo>,
|
||||
|
||||
@@ -227,6 +232,18 @@ impl Service {
|
||||
Swarm::listeners(&self.swarm)
|
||||
}
|
||||
|
||||
/// Returns the downloaded bytes per second averaged over the past few seconds.
|
||||
#[inline]
|
||||
pub fn average_download_per_sec(&self) -> u64 {
|
||||
self.bandwidth.average_download_per_sec()
|
||||
}
|
||||
|
||||
/// Returns the uploaded bytes per second averaged over the past few seconds.
|
||||
#[inline]
|
||||
pub fn average_upload_per_sec(&self) -> u64 {
|
||||
self.bandwidth.average_upload_per_sec()
|
||||
}
|
||||
|
||||
/// Returns the peer id of the local node.
|
||||
#[inline]
|
||||
pub fn peer_id(&self) -> &PeerId {
|
||||
|
||||
@@ -17,12 +17,19 @@
|
||||
use futures::prelude::*;
|
||||
use libp2p::{InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, mplex, secio, yamux, tcp, dns, websocket};
|
||||
use libp2p::core::{self, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||
use std::{io, time::Duration, usize};
|
||||
use std::{io, sync::Arc, time::Duration, usize};
|
||||
|
||||
pub use self::bandwidth::BandwidthSinks;
|
||||
|
||||
mod bandwidth;
|
||||
|
||||
/// Builds the transport that serves as a common ground for all connections.
|
||||
///
|
||||
/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all
|
||||
/// the connections spawned with this transport.
|
||||
pub fn build_transport(
|
||||
local_private_key: secio::SecioKeyPair
|
||||
) -> Boxed<(PeerId, StreamMuxerBox), io::Error> {
|
||||
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
|
||||
let mut mplex_config = mplex::MplexConfig::new();
|
||||
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
|
||||
mplex_config.max_buffer_len(usize::MAX);
|
||||
@@ -30,9 +37,10 @@ pub fn build_transport(
|
||||
let transport = tcp::TcpConfig::new();
|
||||
let transport = websocket::WsConfig::new(transport.clone()).or_transport(transport);
|
||||
let transport = dns::DnsConfig::new(transport);
|
||||
let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, 5);
|
||||
|
||||
// TODO: rework the transport creation (https://github.com/libp2p/rust-libp2p/issues/783)
|
||||
transport
|
||||
let transport = transport
|
||||
.with_upgrade(secio::SecioConfig::new(local_private_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
@@ -46,5 +54,7 @@ pub fn build_transport(
|
||||
})
|
||||
.with_timeout(Duration::from_secs(20))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.boxed()
|
||||
.boxed();
|
||||
|
||||
(transport, sinks)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::{prelude::*, try_ready};
|
||||
use lazy_static::lazy_static;
|
||||
use libp2p::{Multiaddr, core::Transport, core::transport::TransportError};
|
||||
use parking_lot::Mutex;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::{io, io::Read, io::Write, sync::Arc, time::Instant};
|
||||
|
||||
/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections.
|
||||
#[derive(Clone)]
|
||||
pub struct BandwidthLogging<TInner> {
|
||||
inner: TInner,
|
||||
sinks: Arc<BandwidthSinks>,
|
||||
}
|
||||
|
||||
impl<TInner> BandwidthLogging<TInner> {
|
||||
/// Creates a new `BandwidthLogging` around the transport.
|
||||
#[inline]
|
||||
pub fn new(inner: TInner, period_seconds: u32) -> (Self, Arc<BandwidthSinks>) {
|
||||
let sink = Arc::new(BandwidthSinks {
|
||||
download: Mutex::new(BandwidthSink::new(period_seconds)),
|
||||
upload: Mutex::new(BandwidthSink::new(period_seconds)),
|
||||
});
|
||||
|
||||
let trans = BandwidthLogging {
|
||||
inner,
|
||||
sinks: sink.clone(),
|
||||
};
|
||||
|
||||
(trans, sink)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInner> Transport for BandwidthLogging<TInner>
|
||||
where
|
||||
TInner: Transport,
|
||||
{
|
||||
type Output = BandwidthConnecLogging<TInner::Output>;
|
||||
type Error = TInner::Error;
|
||||
type Listener = BandwidthListener<TInner::Listener>;
|
||||
type ListenerUpgrade = BandwidthFuture<TInner::ListenerUpgrade>;
|
||||
type Dial = BandwidthFuture<TInner::Dial>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
|
||||
let sinks = self.sinks;
|
||||
self.inner
|
||||
.listen_on(addr)
|
||||
.map(|(inner, new_addr)| (BandwidthListener { inner, sinks }, new_addr))
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
let sinks = self.sinks;
|
||||
self.inner
|
||||
.dial(addr)
|
||||
.map(move |fut| BandwidthFuture {
|
||||
inner: fut,
|
||||
sinks,
|
||||
})
|
||||
}
|
||||
|
||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||
self.inner.nat_traversal(server, observed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth
|
||||
/// counter.
|
||||
pub struct BandwidthListener<TInner> {
|
||||
inner: TInner,
|
||||
sinks: Arc<BandwidthSinks>,
|
||||
}
|
||||
|
||||
impl<TInner, TConn> Stream for BandwidthListener<TInner>
|
||||
where TInner: Stream<Item = (TConn, Multiaddr)>,
|
||||
{
|
||||
type Item = (BandwidthFuture<TConn>, Multiaddr);
|
||||
type Error = TInner::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let (inner, addr) = match try_ready!(self.inner.poll()) {
|
||||
Some(v) => v,
|
||||
None => return Ok(Async::Ready(None))
|
||||
};
|
||||
|
||||
let fut = BandwidthFuture {
|
||||
inner,
|
||||
sinks: self.sinks.clone(),
|
||||
};
|
||||
|
||||
Ok(Async::Ready(Some((fut, addr))))
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
|
||||
/// counter.
|
||||
pub struct BandwidthFuture<TInner> {
|
||||
inner: TInner,
|
||||
sinks: Arc<BandwidthSinks>,
|
||||
}
|
||||
|
||||
impl<TInner> Future for BandwidthFuture<TInner>
|
||||
where TInner: Future,
|
||||
{
|
||||
type Item = BandwidthConnecLogging<TInner::Item>;
|
||||
type Error = TInner::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let inner = try_ready!(self.inner.poll());
|
||||
Ok(Async::Ready(BandwidthConnecLogging {
|
||||
inner,
|
||||
sinks: self.sinks.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Allows obtaining the average bandwidth of the connections created from a `BandwidthLogging`.
|
||||
pub struct BandwidthSinks {
|
||||
download: Mutex<BandwidthSink>,
|
||||
upload: Mutex<BandwidthSink>,
|
||||
}
|
||||
|
||||
impl BandwidthSinks {
|
||||
/// Returns the average number of bytes that have been downloaded in the period.
|
||||
#[inline]
|
||||
pub fn average_download_per_sec(&self) -> u64 {
|
||||
self.download.lock().get()
|
||||
}
|
||||
|
||||
/// Returns the average number of bytes that have been uploaded in the period.
|
||||
#[inline]
|
||||
pub fn average_upload_per_sec(&self) -> u64 {
|
||||
self.upload.lock().get()
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it.
|
||||
pub struct BandwidthConnecLogging<TInner> {
|
||||
inner: TInner,
|
||||
sinks: Arc<BandwidthSinks>,
|
||||
}
|
||||
|
||||
impl<TInner> Read for BandwidthConnecLogging<TInner>
|
||||
where TInner: Read
|
||||
{
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let num_bytes = self.inner.read(buf)?;
|
||||
self.sinks.download.lock().inject(num_bytes);
|
||||
Ok(num_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInner> tokio_io::AsyncRead for BandwidthConnecLogging<TInner>
|
||||
where TInner: tokio_io::AsyncRead
|
||||
{
|
||||
}
|
||||
|
||||
impl<TInner> Write for BandwidthConnecLogging<TInner>
|
||||
where TInner: Write
|
||||
{
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let num_bytes = self.inner.write(buf)?;
|
||||
self.sinks.upload.lock().inject(num_bytes);
|
||||
Ok(num_bytes)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInner> tokio_io::AsyncWrite for BandwidthConnecLogging<TInner>
|
||||
where TInner: tokio_io::AsyncWrite
|
||||
{
|
||||
#[inline]
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.inner.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now.
|
||||
#[inline]
|
||||
fn current_second() -> u32 {
|
||||
lazy_static! {
|
||||
static ref EPOCH: Instant = Instant::now();
|
||||
}
|
||||
|
||||
EPOCH.elapsed().as_secs() as u32
|
||||
}
|
||||
|
||||
/// Structure that calculates the average bandwidth over the last few seconds.
|
||||
///
|
||||
/// If you want to calculate for example both download and upload bandwidths, create two different
|
||||
/// objects.
|
||||
struct BandwidthSink {
|
||||
/// Bytes sent over the past seconds. Contains `rolling_seconds + 1` elements. Only the first
|
||||
/// `rolling_seconds` elements are taken into account for the average, while the last element
|
||||
/// is the element to be inserted later.
|
||||
bytes: SmallVec<[u64; 8]>,
|
||||
/// Number of seconds between `EPOCH` and the moment we have last updated `bytes`.
|
||||
latest_update: u32,
|
||||
/// Number of seconds. Configured at initialization and never modified.
|
||||
rolling_seconds: u32,
|
||||
}
|
||||
|
||||
impl BandwidthSink {
|
||||
/// Initializes a `BandwidthSink`.
|
||||
fn new(seconds: u32) -> Self {
|
||||
BandwidthSink {
|
||||
bytes: smallvec![0; seconds as usize + 1],
|
||||
latest_update: current_second(),
|
||||
rolling_seconds: seconds,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes over the last few seconds. The number of seconds is the value
|
||||
/// configured at initialization.
|
||||
fn get(&mut self) -> u64 {
|
||||
self.update();
|
||||
self.bytes.iter()
|
||||
.take(self.rolling_seconds.saturating_sub(1) as usize)
|
||||
.fold(0u64, |a, &b| a.saturating_add(b)) / u64::from(self.rolling_seconds)
|
||||
}
|
||||
|
||||
/// Notifies the `BandwidthSink` that a certain number of bytes have been transmitted at this
|
||||
/// moment.
|
||||
fn inject(&mut self, bytes: usize) {
|
||||
self.update();
|
||||
if let Some(last) = self.bytes.last_mut() {
|
||||
*last = last.saturating_add(bytes as u64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the state of the `BandwidthSink` so that the last element of `bytes` contains the
|
||||
/// current second.
|
||||
fn update(&mut self) {
|
||||
let current_second = current_second();
|
||||
for _ in self.latest_update .. current_second {
|
||||
self.bytes.remove(0);
|
||||
self.bytes.push(0);
|
||||
}
|
||||
|
||||
self.latest_update = current_second;
|
||||
}
|
||||
}
|
||||
@@ -173,6 +173,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
/// Returns the downloaded bytes per second averaged over the past few seconds.
|
||||
#[inline]
|
||||
pub fn average_download_per_sec(&self) -> u64 {
|
||||
self.network.lock().average_download_per_sec()
|
||||
}
|
||||
|
||||
/// Returns the uploaded bytes per second averaged over the past few seconds.
|
||||
#[inline]
|
||||
pub fn average_upload_per_sec(&self) -> u64 {
|
||||
self.network.lock().average_upload_per_sec()
|
||||
}
|
||||
|
||||
/// Called when a new block is imported by the client.
|
||||
pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) {
|
||||
self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header)
|
||||
|
||||
Reference in New Issue
Block a user