From 8d2c57c7e25919ef63e97dda2897269a42ba0d54 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 16 Dec 2019 09:22:24 +0000 Subject: [PATCH] refactor: convert stats API to async/await Depends on: * [ ] libp2p 0.27 * [ ] https://github.com/ipfs/js-ipfs/pull/2659 * [ ] https://github.com/ipfs/js-ipfs/pull/2674 --- src/core/components/init.js | 7 ++- src/core/components/start.js | 5 ++ src/core/components/stats.js | 83 --------------------------------- src/core/components/stats/bw.js | 63 +++++++++++++++++++++++++ src/core/components/stop.js | 5 ++ 5 files changed, 79 insertions(+), 84 deletions(-) delete mode 100644 src/core/components/stats.js create mode 100644 src/core/components/stats/bw.js diff --git a/src/core/components/init.js b/src/core/components/init.js index 089d6148dc..081ed41498 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -301,7 +301,12 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, - start + start, + stats: { + bitswap: () => { throw new NotStartedError() }, + bw: () => { throw new NotStartedError() }, + repo: Commands.repo.stat({ repo }) + } } return api diff --git a/src/core/components/start.js b/src/core/components/start.js index f9f41c7458..b18d73dfc5 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -134,6 +134,11 @@ function createApi ({ config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, start: () => apiManager.api, + stats: { + bitswap: Commands.bitswap.stat({ bitswap }), + bw: Commands.stats.bw({ libp2p }), + repo: Commands.repo.stat({ repo }) + }, stop } diff --git a/src/core/components/stats.js b/src/core/components/stats.js deleted file mode 100644 index 88c19b352e..0000000000 --- a/src/core/components/stats.js +++ /dev/null @@ -1,83 +0,0 @@ -'use strict' - -const callbackify = require('callbackify') -const Big = require('bignumber.js') -const Pushable = require('pull-pushable') -const human = require('human-to-milliseconds') -const toStream = require('pull-stream-to-stream') -const errCode = require('err-code') - -function bandwidthStats (self, opts) { - let stats - - if (opts.peer) { - stats = self.libp2p.stats.forPeer(opts.peer) - } else if (opts.proto) { - stats = self.libp2p.stats.forProtocol(opts.proto) - } else { - stats = self.libp2p.stats.global - } - - if (!stats) { - return { - totalIn: new Big(0), - totalOut: new Big(0), - rateIn: new Big(0), - rateOut: new Big(0) - } - } - - const snapshot = stats.snapshot - const movingAverages = stats.movingAverages - - return { - totalIn: snapshot.dataReceived, - totalOut: snapshot.dataSent, - rateIn: new Big(movingAverages.dataReceived['60000'].movingAverage() / 60), - rateOut: new Big(movingAverages.dataSent['60000'].movingAverage() / 60) - } -} - -module.exports = function stats (self) { - const _bwPullStream = (opts) => { - opts = opts || {} - let interval = null - const stream = Pushable(true, () => { - if (interval) { - clearInterval(interval) - } - }) - - if (opts.poll) { - let value - try { - value = human(opts.interval || '1s') - } catch (err) { - // Pull stream expects async work, so we need to simulate it. - process.nextTick(() => { - stream.end(errCode(err, 'ERR_INVALID_POLL_INTERVAL')) - }) - } - - interval = setInterval(() => { - stream.push(bandwidthStats(self, opts)) - }, value) - } else { - stream.push(bandwidthStats(self, opts)) - stream.end() - } - - return stream.source - } - - return { - bitswap: require('./bitswap')(self).stat, - repo: require('./repo')(self).stat, - bw: callbackify.variadic(async (opts) => { // eslint-disable-line require-await - opts = opts || {} - return bandwidthStats(self, opts) - }), - bwReadableStream: (opts) => toStream.source(_bwPullStream(opts)), - bwPullStream: _bwPullStream - } -} diff --git a/src/core/components/stats/bw.js b/src/core/components/stats/bw.js new file mode 100644 index 0000000000..d7f72dae3e --- /dev/null +++ b/src/core/components/stats/bw.js @@ -0,0 +1,63 @@ +'use strict' + +const Big = require('bignumber.js') +const human = require('human-to-milliseconds') +const errCode = require('err-code') + +function getBandwidthStats (libp2p, opts) { + let stats + + if (opts.peer) { + stats = libp2p.metrics.forPeer(opts.peer) + } else if (opts.proto) { + stats = libp2p.metrics.forProtocol(opts.proto) + } else { + stats = libp2p.metrics.global + } + + if (!stats) { + return { + totalIn: new Big(0), + totalOut: new Big(0), + rateIn: new Big(0), + rateOut: new Big(0) + } + } + + const { movingAverages, snapshot } = stats + + return { + totalIn: snapshot.dataReceived, + totalOut: snapshot.dataSent, + rateIn: new Big(movingAverages.dataReceived[60000].movingAverage() / 60), + rateOut: new Big(movingAverages.dataSent[60000].movingAverage() / 60) + } +} + +module.exports = ({ libp2p }) => { + return async function * (options) { + options = options || {} + + if (!options.poll) { + yield getBandwidthStats(libp2p, options) + return + } + + let interval + try { + interval = human(options.interval || '1s') + } catch (err) { + throw errCode(err, 'ERR_INVALID_POLL_INTERVAL') + } + + let timeoutId + try { + while (true) { + yield getBandwidthStats(libp2p, options) + await new Promise(resolve => { timeoutId = setTimeout(resolve, interval) }) + } + } finally { + clearTimeout(timeoutId) + } + } +} diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 4e2a9bb036..2c8cdfa040 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -101,6 +101,11 @@ function createApi ({ config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, start, + stats: { + bitswap: () => { throw new NotStartedError() }, + bw: () => { throw new NotStartedError() }, + repo: Commands.repo.stat({ repo }) + }, stop: () => apiManager.api }