From 22aa8482ae965f9933c3bc5cd7e08bee5c466925 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 21 Feb 2019 18:19:25 +0000 Subject: [PATCH] Use BandwidthLogging struct from libp2p (#1847) * Delete existing * Use bandwidth from libp2p --- .../core/network-libp2p/src/transport.rs | 9 +- .../network-libp2p/src/transport/bandwidth.rs | 262 ------------------ 2 files changed, 5 insertions(+), 266 deletions(-) delete mode 100644 substrate/core/network-libp2p/src/transport/bandwidth.rs diff --git a/substrate/core/network-libp2p/src/transport.rs b/substrate/core/network-libp2p/src/transport.rs index 66fd40584c..c095602a11 100644 --- a/substrate/core/network-libp2p/src/transport.rs +++ b/substrate/core/network-libp2p/src/transport.rs @@ -15,14 +15,15 @@ // along with Substrate. If not, see . use futures::prelude::*; -use libp2p::{InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, mplex, secio, yamux, tcp, dns, websocket}; +use libp2p::{ + InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, + mplex, secio, yamux, tcp, dns, websocket, bandwidth +}; use libp2p::core::{self, transport::boxed::Boxed, muxing::StreamMuxerBox}; use std::{io, sync::Arc, time::Duration, usize}; pub use self::bandwidth::BandwidthSinks; -mod bandwidth; - /// Builds the transport that serves as a common ground for all connections. /// /// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all @@ -37,7 +38,7 @@ pub fn build_transport( let transport = tcp::TcpConfig::new(); let transport = websocket::WsConfig::new(transport.clone()).or_transport(transport); let transport = dns::DnsConfig::new(transport); - let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, 5); + let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5)); // TODO: rework the transport creation (https://github.com/libp2p/rust-libp2p/issues/783) let transport = transport diff --git a/substrate/core/network-libp2p/src/transport/bandwidth.rs b/substrate/core/network-libp2p/src/transport/bandwidth.rs deleted file mode 100644 index 23ec064918..0000000000 --- a/substrate/core/network-libp2p/src/transport/bandwidth.rs +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use futures::{prelude::*, try_ready}; -use lazy_static::lazy_static; -use libp2p::{Multiaddr, core::Transport, core::transport::TransportError}; -use parking_lot::Mutex; -use smallvec::{smallvec, SmallVec}; -use std::{io, io::Read, io::Write, sync::Arc, time::Instant}; - -/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections. -#[derive(Clone)] -pub struct BandwidthLogging { - inner: TInner, - sinks: Arc, -} - -impl BandwidthLogging { - /// Creates a new `BandwidthLogging` around the transport. - #[inline] - pub fn new(inner: TInner, period_seconds: u32) -> (Self, Arc) { - let sink = Arc::new(BandwidthSinks { - download: Mutex::new(BandwidthSink::new(period_seconds)), - upload: Mutex::new(BandwidthSink::new(period_seconds)), - }); - - let trans = BandwidthLogging { - inner, - sinks: sink.clone(), - }; - - (trans, sink) - } -} - -impl Transport for BandwidthLogging -where - TInner: Transport, -{ - type Output = BandwidthConnecLogging; - type Error = TInner::Error; - type Listener = BandwidthListener; - type ListenerUpgrade = BandwidthFuture; - type Dial = BandwidthFuture; - - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError> { - let sinks = self.sinks; - self.inner - .listen_on(addr) - .map(|(inner, new_addr)| (BandwidthListener { inner, sinks }, new_addr)) - } - - fn dial(self, addr: Multiaddr) -> Result> { - let sinks = self.sinks; - self.inner - .dial(addr) - .map(move |fut| BandwidthFuture { - inner: fut, - sinks, - }) - } - - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.nat_traversal(server, observed) - } -} - -/// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth -/// counter. -pub struct BandwidthListener { - inner: TInner, - sinks: Arc, -} - -impl Stream for BandwidthListener -where TInner: Stream, -{ - type Item = (BandwidthFuture, Multiaddr); - type Error = TInner::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let (inner, addr) = match try_ready!(self.inner.poll()) { - Some(v) => v, - None => return Ok(Async::Ready(None)) - }; - - let fut = BandwidthFuture { - inner, - sinks: self.sinks.clone(), - }; - - Ok(Async::Ready(Some((fut, addr)))) - } -} - -/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth -/// counter. -pub struct BandwidthFuture { - inner: TInner, - sinks: Arc, -} - -impl Future for BandwidthFuture - where TInner: Future, -{ - type Item = BandwidthConnecLogging; - type Error = TInner::Error; - - fn poll(&mut self) -> Poll { - let inner = try_ready!(self.inner.poll()); - Ok(Async::Ready(BandwidthConnecLogging { - inner, - sinks: self.sinks.clone(), - })) - } -} - -/// Allows obtaining the average bandwidth of the connections created from a `BandwidthLogging`. -pub struct BandwidthSinks { - download: Mutex, - upload: Mutex, -} - -impl BandwidthSinks { - /// Returns the average number of bytes that have been downloaded in the period. - #[inline] - pub fn average_download_per_sec(&self) -> u64 { - self.download.lock().get() - } - - /// Returns the average number of bytes that have been uploaded in the period. - #[inline] - pub fn average_upload_per_sec(&self) -> u64 { - self.upload.lock().get() - } -} - -/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. -pub struct BandwidthConnecLogging { - inner: TInner, - sinks: Arc, -} - -impl Read for BandwidthConnecLogging - where TInner: Read -{ - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let num_bytes = self.inner.read(buf)?; - self.sinks.download.lock().inject(num_bytes); - Ok(num_bytes) - } -} - -impl tokio_io::AsyncRead for BandwidthConnecLogging - where TInner: tokio_io::AsyncRead -{ -} - -impl Write for BandwidthConnecLogging - where TInner: Write -{ - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - let num_bytes = self.inner.write(buf)?; - self.sinks.upload.lock().inject(num_bytes); - Ok(num_bytes) - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - -impl tokio_io::AsyncWrite for BandwidthConnecLogging - where TInner: tokio_io::AsyncWrite -{ - #[inline] - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.shutdown() - } -} - -/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now. -#[inline] -fn current_second() -> u32 { - lazy_static! { - static ref EPOCH: Instant = Instant::now(); - } - - EPOCH.elapsed().as_secs() as u32 -} - -/// Structure that calculates the average bandwidth over the last few seconds. -/// -/// If you want to calculate for example both download and upload bandwidths, create two different -/// objects. -struct BandwidthSink { - /// Bytes sent over the past seconds. Contains `rolling_seconds + 1` elements. Only the first - /// `rolling_seconds` elements are taken into account for the average, while the last element - /// is the element to be inserted later. - bytes: SmallVec<[u64; 8]>, - /// Number of seconds between `EPOCH` and the moment we have last updated `bytes`. - latest_update: u32, - /// Number of seconds. Configured at initialization and never modified. - rolling_seconds: u32, -} - -impl BandwidthSink { - /// Initializes a `BandwidthSink`. - fn new(seconds: u32) -> Self { - BandwidthSink { - bytes: smallvec![0; seconds as usize + 1], - latest_update: current_second(), - rolling_seconds: seconds, - } - } - - /// Returns the number of bytes over the last few seconds. The number of seconds is the value - /// configured at initialization. - fn get(&mut self) -> u64 { - self.update(); - self.bytes.iter() - .take(self.rolling_seconds.saturating_sub(1) as usize) - .fold(0u64, |a, &b| a.saturating_add(b)) / u64::from(self.rolling_seconds) - } - - /// Notifies the `BandwidthSink` that a certain number of bytes have been transmitted at this - /// moment. - fn inject(&mut self, bytes: usize) { - self.update(); - if let Some(last) = self.bytes.last_mut() { - *last = last.saturating_add(bytes as u64); - } - } - - /// Updates the state of the `BandwidthSink` so that the last element of `bytes` contains the - /// current second. - fn update(&mut self) { - let current_second = current_second(); - for _ in self.latest_update .. current_second { - self.bytes.remove(0); - self.bytes.push(0); - } - - self.latest_update = current_second; - } -}