Skip to content

Commit

Permalink
fix: track stream metrics (#2)
Browse files Browse the repository at this point in the history
Adds missing stream metrics
  • Loading branch information
achingbrain authored Sep 26, 2023
1 parent a31a9a7 commit caafb3d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@
},
"dependencies": {
"@libp2p/interface": "^0.1.2",
"@libp2p/logger": "^3.0.2"
"@libp2p/logger": "^3.0.2",
"it-foreach": "^2.0.4",
"it-stream-types": "^2.0.1"
},
"devDependencies": {
"aegir": "^40.0.13",
Expand Down
46 changes: 44 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
*/

import { logger } from '@libp2p/logger'
import each from 'it-foreach'
import type { MultiaddrConnection, Stream, Connection } from '@libp2p/interface/connection'
import type { Startable } from '@libp2p/interface/dist/src/startable'
import type { Metric, MetricGroup, StopTimer, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, CalculateMetric } from '@libp2p/interface/metrics'
import type { Duplex, Source } from 'it-stream-types'

const log = logger('libp2p:simple-metrics')

Expand Down Expand Up @@ -114,6 +116,7 @@ export interface SimpleMetricsInit {

class SimpleMetrics implements Metrics, Startable {
public metrics = new Map<string, DefaultMetric | DefaultGroupMetric | CalculateMetric>()
private readonly transferStats: Map<string, number>
private started: boolean
private interval?: ReturnType<typeof setInterval>
private readonly intervalMs: number
Expand All @@ -126,6 +129,9 @@ class SimpleMetrics implements Metrics, Startable {

this.intervalMs = init.intervalMs ?? 1000
this.onMetrics = init.onMetrics

// holds global and per-protocol sent/received stats
this.transferStats = new Map()
}

isStarted (): boolean {
Expand Down Expand Up @@ -158,19 +164,55 @@ class SimpleMetrics implements Metrics, Startable {
}
}

this.onMetrics(JSON.parse(JSON.stringify(output)))
this.onMetrics(structuredClone(output))
})
.catch(err => {
log.error('could not invoke onMetrics callback', err)
})
}

trackMultiaddrConnection (maConn: MultiaddrConnection): void {
/**
* Increment the transfer stat for the passed key, making sure
* it exists first
*/
_incrementValue (key: string, value: number): void {
const existing = this.transferStats.get(key) ?? 0

this.transferStats.set(key, existing + value)
}

/**
* Override the sink/source of the stream to count the bytes
* in and out
*/
_track (stream: Duplex<Source<any>>, name: string): void {
const self = this

const sink = stream.sink
stream.sink = async function trackedSink (source) {
await sink(each(source, buf => {
self._incrementValue(`${name} sent`, buf.byteLength)
}))
}

const source = stream.source
stream.source = each(source, buf => {
self._incrementValue(`${name} received`, buf.byteLength)
})
}

trackMultiaddrConnection (maConn: MultiaddrConnection): void {
this._track(maConn, 'global')
}

trackProtocolStream (stream: Stream, connection: Connection): void {
if (stream.protocol == null) {
// protocol not negotiated yet, should not happen as the upgrader
// calls this handler after protocol negotiation
return
}

this._track(stream, stream.protocol)
}

registerMetric (name: string, opts: CalculatedMetricOptions): void
Expand Down
29 changes: 28 additions & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('simple-metrics', () => {
})

it('should invoke the onMetrics callback', async () => {
const deferred = pDefer()
const deferred = pDefer<Record<string, any>>()

s = simpleMetrics({
onMetrics: (metrics) => {
Expand All @@ -28,4 +28,31 @@ describe('simple-metrics', () => {
const metrics = await deferred.promise
expect(metrics).to.be.ok()
})

it('should not allow altering internal state', async () => {
const deferred = pDefer()
const list: Array<Record<string, any>> = []

s = simpleMetrics({
onMetrics: (metrics) => {
list.push(metrics)

if (list.length === 2) {
deferred.resolve()
}
},
intervalMs: 10
})({})

const group = s.registerMetricGroup('foo')
group.update({ bar: 5 })

await start(s)

await deferred.promise

list[0].foo.baz = 'qux'

expect(list).to.not.have.nested.property('[1].foo.baz')
})
})

0 comments on commit caafb3d

Please sign in to comment.