Skip to content

Commit

Permalink
Refactor the bandwidth logging to be less magic (#1670)
Browse files Browse the repository at this point in the history
* Refactor the bandwidth logging to be less magic

* Apply suggestions from code review

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Complete renaming.

* Update changelog.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
Co-authored-by: Roman S. Borschel <roman@parity.io>
  • Loading branch information
3 people authored Jul 27, 2020
1 parent 5291175 commit 8a08f72
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 126 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)

# Version 0.23.0 (2020-??-??)

- Refactored bandwidth logging ([PR 1670](https://github.com/libp2p/rust-libp2p/pull/1670)).

# Version 0.22.0 (2020-07-17)

**NOTE**: For a smooth upgrade path from `0.21` to `> 0.22`
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]
all-features = true

[dependencies]
atomic = "0.4.6"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
Expand Down
153 changes: 32 additions & 121 deletions src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,27 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Multiaddr, core::{Transport, transport::{ListenerEvent, TransportError}}};

use atomic::Atomic;
use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use smallvec::{smallvec, SmallVec};
use std::{cmp, io, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
use std::{
convert::TryFrom as _, io, pin::Pin, sync::{atomic::Ordering, Arc}, task::{Context, Poll}
};

/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections.
/// Wraps around a `Transport` and counts the number of bytes that go through all the opened
/// connections.
#[derive(Clone)]
pub struct BandwidthLogging<TInner> {
inner: TInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner> BandwidthLogging<TInner> {
/// Creates a new `BandwidthLogging` around the transport.
pub fn new(inner: TInner, period: Duration) -> (Self, Arc<BandwidthSinks>) {
let mut period_seconds = cmp::min(period.as_secs(), 86400) as u32;
if period.subsec_nanos() > 0 {
period_seconds += 1;
}

/// Creates a new [`BandwidthLogging`] around the transport.
pub fn new(inner: TInner) -> (Self, Arc<BandwidthSinks>) {
let sink = Arc::new(BandwidthSinks {
download: Mutex::new(BandwidthSink::new(period_seconds)),
upload: Mutex::new(BandwidthSink::new(period_seconds)),
inbound: Atomic::new(0),
outbound: Atomic::new(0),
});

let trans = BandwidthLogging {
Expand Down Expand Up @@ -134,21 +130,29 @@ impl<TInner: TryFuture> Future for BandwidthFuture<TInner> {
}
}

/// Allows obtaining the average bandwidth of the connections created from a `BandwidthLogging`.
/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`].
pub struct BandwidthSinks {
download: Mutex<BandwidthSink>,
upload: Mutex<BandwidthSink>,
inbound: Atomic<u64>,
outbound: Atomic<u64>,
}

impl BandwidthSinks {
/// Returns the average number of bytes that have been downloaded in the period.
pub fn average_download_per_sec(&self) -> u64 {
self.download.lock().get()
/// Returns the total number of bytes that have been downloaded on all the connections spawned
/// through the [`BandwidthLogging`].
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
pub fn total_inbound(&self) -> u64 {
self.inbound.load(Ordering::Relaxed)
}

/// Returns the average number of bytes that have been uploaded in the period.
pub fn average_upload_per_sec(&self) -> u64 {
self.upload.lock().get()
/// Returns the total number of bytes that have been uploaded on all the connections spawned
/// through the [`BandwidthLogging`].
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
pub fn total_outbound(&self) -> u64 {
self.outbound.load(Ordering::Relaxed)
}
}

Expand All @@ -164,14 +168,14 @@ impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_read(cx, buf))?;
this.sinks.download.lock().inject(num_bytes);
this.sinks.inbound.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed);
Poll::Ready(Ok(num_bytes))
}

fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?;
this.sinks.download.lock().inject(num_bytes);
this.sinks.inbound.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed);
Poll::Ready(Ok(num_bytes))
}
}
Expand All @@ -180,14 +184,14 @@ impl<TInner: AsyncWrite> AsyncWrite for BandwidthConnecLogging<TInner> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_write(cx, buf))?;
this.sinks.upload.lock().inject(num_bytes);
this.sinks.outbound.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed);
Poll::Ready(Ok(num_bytes))
}

fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?;
this.sinks.upload.lock().inject(num_bytes);
this.sinks.outbound.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed);
Poll::Ready(Ok(num_bytes))
}

Expand All @@ -201,96 +205,3 @@ impl<TInner: AsyncWrite> AsyncWrite for BandwidthConnecLogging<TInner> {
this.inner.poll_close(cx)
}
}

/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now.
fn current_second() -> u32 {
lazy_static! {
static ref EPOCH: Instant = Instant::now();
}

EPOCH.elapsed().as_secs() as u32
}

/// Structure that calculates the average bandwidth over the last few seconds.
///
/// If you want to calculate for example both download and upload bandwidths, create two different
/// objects.
struct BandwidthSink {
/// Bytes sent over the past seconds. Contains `rolling_seconds + 1` elements, where
/// `rolling_seconds` is the value passed to `new`. Only the first `rolling_seconds` elements
/// are taken into account for the average, while the last element is the element to be
/// inserted later.
bytes: SmallVec<[u64; 8]>,
/// Number of seconds between `EPOCH` and the moment we have last updated `bytes`.
latest_update: u32,
}

impl BandwidthSink {
/// Initializes a `BandwidthSink`.
fn new(seconds: u32) -> Self {
BandwidthSink {
bytes: smallvec![0; seconds as usize + 1],
latest_update: current_second(),
}
}

/// Returns the number of bytes over the last few seconds. The number of seconds is the value
/// configured at initialization.
fn get(&mut self) -> u64 {
self.update();
let seconds = self.bytes.len() - 1;
self.bytes.iter()
.take(seconds)
.fold(0u64, |a, &b| a.saturating_add(b)) / seconds as u64
}

/// Notifies the `BandwidthSink` that a certain number of bytes have been transmitted at this
/// moment.
fn inject(&mut self, bytes: usize) {
self.update();
if let Some(last) = self.bytes.last_mut() {
*last = last.saturating_add(bytes as u64);
}
}

/// Updates the state of the `BandwidthSink` so that the last element of `bytes` contains the
/// current second.
fn update(&mut self) {
let current_second = current_second();
debug_assert!(current_second >= self.latest_update);
let num_iter = cmp::min(current_second - self.latest_update, self.bytes.len() as u32);
for _ in 0..num_iter {
self.bytes.remove(0);
self.bytes.push(0);
}
self.latest_update = current_second;
}
}

#[cfg(test)]
mod tests {
use std::{thread, time::Duration};
use super::*;

#[test]
fn sink_works() {
let mut sink = BandwidthSink::new(5);
sink.inject(100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 20);
sink.inject(100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 40);
sink.inject(100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 60);
sink.inject(100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 80);
sink.inject(100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 100);
thread::sleep(Duration::from_millis(1000));
assert_eq!(sink.get(), 80);
}
}
10 changes: 5 additions & 5 deletions src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
//! Provides the `TransportExt` trait.

use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport};
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
pub trait TransportExt: Transport {
/// Adds a layer on the `Transport` that logs all trafic that passes through the sockets
/// created by it.
///
/// This method returns an `Arc<BandwidthSinks>` that can be used to retreive the bandwidth
/// values.
fn with_bandwidth_logging(self, period: Duration) -> (BandwidthLogging<Self>, Arc<BandwidthSinks>)
/// This method returns an `Arc<BandwidthSinks>` that can be used to retreive the total number
/// of bytes transferred through the sockets.
fn with_bandwidth_logging(self) -> (BandwidthLogging<Self>, Arc<BandwidthSinks>)
where
Self: Sized
{
BandwidthLogging::new(self, period)
BandwidthLogging::new(self)
}

// TODO: add methods to easily upgrade for secio/mplex/yamux
Expand Down

0 comments on commit 8a08f72

Please sign in to comment.