Skip to content
This repository has been archived by the owner on May 16, 2024. It is now read-only.

fix: track stream metrics #2

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@
},
"dependencies": {
"@libp2p/interface": "^0.1.2",
"@libp2p/logger": "^3.0.2"
"@libp2p/logger": "^3.0.2",
"it-foreach": "^2.0.4",
"it-stream-types": "^2.0.1"
},
"devDependencies": {
"aegir": "^40.0.13",
Expand Down
46 changes: 44 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
*/

import { logger } from '@libp2p/logger'
import each from 'it-foreach'
import type { MultiaddrConnection, Stream, Connection } from '@libp2p/interface/connection'
import type { Startable } from '@libp2p/interface/dist/src/startable'
import type { Metric, MetricGroup, StopTimer, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, CalculateMetric } from '@libp2p/interface/metrics'
import type { Duplex, Source } from 'it-stream-types'

const log = logger('libp2p:simple-metrics')

Expand Down Expand Up @@ -114,6 +116,7 @@ export interface SimpleMetricsInit {

class SimpleMetrics implements Metrics, Startable {
public metrics = new Map<string, DefaultMetric | DefaultGroupMetric | CalculateMetric>()
private readonly transferStats: Map<string, number>
private started: boolean
private interval?: ReturnType<typeof setInterval>
private readonly intervalMs: number
Expand All @@ -126,6 +129,9 @@ class SimpleMetrics implements Metrics, Startable {

this.intervalMs = init.intervalMs ?? 1000
this.onMetrics = init.onMetrics

// holds global and per-protocol sent/received stats
this.transferStats = new Map()
}

isStarted (): boolean {
Expand Down Expand Up @@ -158,19 +164,55 @@ class SimpleMetrics implements Metrics, Startable {
}
}

this.onMetrics(JSON.parse(JSON.stringify(output)))
this.onMetrics(structuredClone(output))
})
.catch(err => {
log.error('could not invoke onMetrics callback', err)
})
}

trackMultiaddrConnection (maConn: MultiaddrConnection): void {
/**
* Increment the transfer stat for the passed key, making sure
* it exists first
*/
_incrementValue (key: string, value: number): void {
const existing = this.transferStats.get(key) ?? 0

this.transferStats.set(key, existing + value)
}

/**
* Override the sink/source of the stream to count the bytes
* in and out
*/
_track (stream: Duplex<Source<any>>, name: string): void {
const self = this

const sink = stream.sink
stream.sink = async function trackedSink (source) {
await sink(each(source, buf => {
self._incrementValue(`${name} sent`, buf.byteLength)
}))
}

const source = stream.source
stream.source = each(source, buf => {
self._incrementValue(`${name} received`, buf.byteLength)
})
}

trackMultiaddrConnection (maConn: MultiaddrConnection): void {
this._track(maConn, 'global')
}

trackProtocolStream (stream: Stream, connection: Connection): void {
if (stream.protocol == null) {
// protocol not negotiated yet, should not happen as the upgrader
// calls this handler after protocol negotiation
return
}

this._track(stream, stream.protocol)
}

registerMetric (name: string, opts: CalculatedMetricOptions): void
Expand Down
29 changes: 28 additions & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('simple-metrics', () => {
})

it('should invoke the onMetrics callback', async () => {
const deferred = pDefer()
const deferred = pDefer<Record<string, any>>()

s = simpleMetrics({
onMetrics: (metrics) => {
Expand All @@ -28,4 +28,31 @@ describe('simple-metrics', () => {
const metrics = await deferred.promise
expect(metrics).to.be.ok()
})

it('should not allow altering internal state', async () => {
const deferred = pDefer()
const list: Array<Record<string, any>> = []

s = simpleMetrics({
onMetrics: (metrics) => {
list.push(metrics)

if (list.length === 2) {
deferred.resolve()
}
},
intervalMs: 10
})({})

const group = s.registerMetricGroup('foo')
group.update({ bar: 5 })

await start(s)

await deferred.promise

list[0].foo.baz = 'qux'

expect(list).to.not.have.nested.property('[1].foo.baz')
})
})