From 340e8f016bdce3856860b21ceeb094b282be3333 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Mon, 9 Sep 2024 12:43:22 +0200 Subject: [PATCH] Add hypercore_round_trip_time_avg_seconds --- README.md | 2 ++ index.js | 22 +++++++++++++++++++++- package.json | 2 ++ test.js | 34 ++++++++++++++++++++++++++++------ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e6cff60..d2aaf78 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Stats for Hypercores, with Prometheus support. +Assumes the hypercores are replicated over UDX streams. This is the case for all normal use-cases of hypercores (like replicating across hyperdht/hyperswarm). + ## Install ``` diff --git a/index.js b/index.js index cbd8abf..81adc39 100644 --- a/index.js +++ b/index.js @@ -53,6 +53,10 @@ class HypercoreStats { return this._getStats().totalMaxInflightBlocks } + getAvgRoundTripTimeMs () { + return this._getStats().avgRoundTripTimeMs + } + // getTotalBlocksUploaded () { // return this._getStats().totalBlocksUploaded // } @@ -190,6 +194,13 @@ class HypercoreStats { } }) + new promClient.Gauge({ // eslint-disable-line no-new + name: 'hypercore_round_trip_time_avg_seconds', + help: 'Average round-trip time (rtt) for the open replication streams', + collect () { + this.set(self.getAvgRoundTripTimeMs() / 1000) + } + }) /* new promClient.Gauge({ // eslint-disable-line no-new name: 'hypercore_total_blocks_uploaded', @@ -386,6 +397,7 @@ class HypercoreStatsSnapshot { this.totalLength = 0 this.totalInflightBlocks = 0 this.totalMaxInflightBlocks = 0 + this._totalRoundTripTime = 0 // this.totalBlocksUploaded = 0 // this.totalBlocksDownloaded = 0 // this.totalBytesUploaded = 0 @@ -398,6 +410,12 @@ class HypercoreStatsSnapshot { return this._totalPeersConns.size } + get avgRoundTripTimeMs () { + return this.totalPeers === 0 + ? 0 + : this._totalRoundTripTime / this.totalPeers + } + calculate () { this.totalCores = this.cores.length @@ -428,9 +446,11 @@ class HypercoreStatsSnapshot { } for (const peer of core.peers) { + const udxStream = peer.stream.rawStream this.totalInflightBlocks += peer.inflight - this._totalPeersConns.add(peer.stream.rawStream) + this._totalPeersConns.add(udxStream) this.totalMaxInflightBlocks += peer.getMaxInflight() + this._totalRoundTripTime += udxStream.rtt } } } diff --git a/package.json b/package.json index 4167b67..2b0543b 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,8 @@ "brittle": "^3.6.1", "corestore": "^6.18.4", "hypercore": "^10.37.19", + "hyperdht": "^6.16.5", + "hyperswarm": "^4.8.2", "prom-client": "^15.1.3", "random-access-memory": "^6.2.1", "standard": "^17.1.0" diff --git a/test.js b/test.js index bd91b43..11eb525 100644 --- a/test.js +++ b/test.js @@ -3,6 +3,8 @@ const Corestore = require('corestore') const RAM = require('random-access-memory') const HypercoreStats = require('.') const promClient = require('prom-client') +const setupTestnet = require('hyperdht/testnet') +const Hyperswarm = require('hyperswarm') const DEBUG = false @@ -33,6 +35,7 @@ test('Can register and get prometheus metrics', async (t) => { t.is(getMetricValue(lines, 'hypercore_total_inflight_blocks'), 0, 'hypercore_total_inflight_blocks init 0') t.is(getMetricValue(lines, 'hypercore_total_max_inflight_blocks'), 0, 'hypercore_total_max_inflight_blocks init 0') t.is(getMetricValue(lines, 'hypercore_total_peers'), 0, 'hypercore_total_peers init 0') + t.is(getMetricValue(lines, 'hypercore_round_trip_time_avg_seconds'), 0, 'hypercore_round_trip_time_avg_seconds init 0') // t.is(getMetricValue(lines, 'hypercore_total_blocks_downloaded'), 0, 'hypercore_total_blocks_downloaded init 0') // t.is(getMetricValue(lines, 'hypercore_total_blocks_uploaded'), 0, 'hypercore_total_blocks_uploaded init 0') @@ -65,10 +68,24 @@ test('Can register and get prometheus metrics', async (t) => { const readCore = store2.get({ key: core.key }) await readCore.ready() - // Add error handlers if it turns out these can error - const s1 = core.replicate(true) - const s2 = readCore.replicate(false) - s1.pipe(s2).pipe(s1) + // Some stats come from udx, so we need UDX streams + // instead of directly replicating the hypercores. + // Easiest to just use hyperswarm for that. + const testnet = await setupTestnet() + const bootstrap = testnet.bootstrap + const swarm1 = new Hyperswarm({ bootstrap }) + swarm1.on('connection', (conn) => { + store.replicate(conn) + }) + const swarm2 = new Hyperswarm({ bootstrap }) + swarm2.on('connection', (conn) => { + store2.replicate(conn) + }) + + swarm1.join(core.discoveryKey) + await swarm1.flush() + swarm2.join(core.discoveryKey) + await new Promise(resolve => setImmediate(resolve)) await readCore.get(0) // DEVNOTE: The precise lifecycle of when a peer is added to @@ -88,6 +105,7 @@ test('Can register and get prometheus metrics', async (t) => { // TODO: proper test of inflight metrics t.is(getMetricValue(lines, 'hypercore_total_length'), 3, 'hypercore_total_length') t.is(getMetricValue(lines, 'hypercore_total_peers'), 1, 'hypercore_total_peers') + t.is(getMetricValue(lines, 'hypercore_round_trip_time_avg_seconds') > 0, true, 'hypercore_round_trip_time_avg_seconds') // t.is(getMetricValue(lines, 'hypercore_total_blocks_downloaded'), 0, 'hypercore_total_blocks_downloaded') // t.is(getMetricValue(lines, 'hypercore_total_blocks_uploaded'), 1, 'hypercore_total_blocks_uploaded') // t.is(getMetricValue(lines, 'hypercore_total_bytes_downloaded'), 0, 'hypercore_total_bytes_downloaded') @@ -99,6 +117,10 @@ test('Can register and get prometheus metrics', async (t) => { t.ok(getMetricValue(lines, 'hypercore_total_wire_request_received') > 0, 'hypercore_total_wire_request_received') t.ok(getMetricValue(lines, 'hypercore_total_wire_range_transmitted') > 0, 'hypercore_total_wire_range_transmitted') } + + await swarm1.destroy() + await swarm2.destroy() + await testnet.destroy() }) test('Cache-expiry logic', async (t) => { @@ -116,7 +138,7 @@ test('Cache-expiry logic', async (t) => { const metrics = await promClient.register.metrics() const lines = metrics.split('\n') - if (DEBUG) console.log(metrics) + // if (DEBUG) console.log(metrics) t.is(getMetricValue(lines, 'hypercore_total_cores'), 0, 'init 0 (sanity check)') } @@ -154,7 +176,7 @@ function getMetricValue (lines, name) { const match = lines.find((l) => l.startsWith(`${name} `)) if (!match) throw new Error(`No match for ${name}`) - const value = parseInt(match.split(' ')[1]) + const value = parseFloat(match.split(' ')[1]) if (DEBUG) console.log(name, '->', value) return value