Skip to content

Commit

Permalink
Calc stats efficiently with cache
Browse files Browse the repository at this point in the history
  • Loading branch information
HDegroote committed Aug 20, 2024
1 parent 09b77ec commit 48d16e1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 32 deletions.
101 changes: 69 additions & 32 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,61 @@
class HypercoreStats {
constructor () {
constructor ({ cacheExpiryMs = 5000 } = {}) {
this.cores = []
this.cacheExpiryMs = cacheExpiryMs

// DEVNOTE: We calculate the stats all at once to avoid iterating over
// all cores and their peers multiple times (once per metric)
// However, prometheus' scrape model does not support any state.
// So there is no explicit way to precalculate all stats for one scrape
// As a workaround, we cache the calculated stats for a short time
// (but a lot longer than a scrape action should take)
// That way a single scrape action should returns stats
// calculated from the same snapshot.
// The edge cases happen when:
// - 2 scrape requests arrive within less (or not much more) time than the cacheExpiry
// - The stats api is accessed programatically outside of prometheus scraping
// - scraping takes > cacheExpiry (but that should never be the case)
// The edge cases aren't dramatic: it just means that different stats are taken 5s apart
this._cachedStats = null
}

addCore (core) {
this.cores.push(core)
}

get totalCores () {
return this.cores.length
return this._getStats().totalCores
}

getTotalLength () {
return this.cores.reduce(
(sum, core) => sum + core.length,
0
)
return this._getStats().totalLength
}

getTotalPeers () {
const uniquePeers = new Set()
for (const core of this.cores) {
for (const peer of core.peers) {
// rawStream shared means it's the same socket
// (easy way to count peers multiplexing over many cores only once)
uniquePeers.add(peer.stream.rawStream)
}
}

return uniquePeers.size
return this._getStats().totalPeers
}

getTotalInflightBlocks () {
return this.cores.reduce(
(sum, core) => {
return sum + core.peers.reduce(
(coreSum, peer) => coreSum + peer.inflight,
0
)
}, 0
)
return this._getStats().totalInflightBlocks
}

getTotalMaxInflightBlocks () {
return this.cores.reduce(
(sum, core) => {
return sum + core.peers.reduce(
(coreSum, peer) => coreSum + peer.getMaxInflight(),
0
)
}, 0
)
return this._getStats().totalMaxInflightBlocks
}

// Caches the result for this._lastStatsCalcTime ms
_getStats () {
if (this._cachedStats && this._lastStatsCalcTime + this.cacheExpiryMs > Date.now()) {
return this._cachedStats
}

this._cachedStats = new HypercoreStatsSnapshot(this.cores)
this._lastStatsCalcTime = Date.now()
return this._cachedStats
}

clearCache () {
this._cachedStats = null
}

registerPrometheusMetrics (promClient) {
Expand Down Expand Up @@ -97,4 +102,36 @@ class HypercoreStats {
}
}

class HypercoreStatsSnapshot {
constructor (cores) {
this.cores = cores

this._totalPeersConns = new Set()
this.totalCores = 0
this.totalLength = 0
this.totalInflightBlocks = 0
this.totalMaxInflightBlocks = 0

this.calculate()
}

get totalPeers () {
return this._totalPeersConns.size
}

calculate () {
this.totalCores = this.cores.length

for (const core of this.cores) {
this.totalLength += core.length

for (const peer of core.peers) {
this.totalInflightBlocks += peer.inflight
this._totalPeersConns.add(peer.stream.rawStream)
this.totalMaxInflightBlocks += peer.getMaxInflight()
}
}
}
}

module.exports = HypercoreStats
39 changes: 39 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ test('Can register and get prometheus metrics', async (t) => {

const stats = new HypercoreStats()
stats.registerPrometheusMetrics(promClient)
t.teardown(() => {
promClient.register.clear()
})

stats.addCore(core)
stats.addCore(core2)
Expand Down Expand Up @@ -50,6 +53,7 @@ test('Can register and get prometheus metrics', async (t) => {
await new Promise(resolve => setTimeout(resolve, 100))

{
stats.clearCache()
const metrics = await promClient.register.metrics()
const lines = metrics.split('\n')

Expand All @@ -70,3 +74,38 @@ function getMetricValue (lines, name) {

return value
}

test('Cache-expiry logic', async (t) => {
const store = new Corestore(RAM)
const core = store.get({ name: 'core' })

const stats = new HypercoreStats({ cacheExpiryMs: 1000 })
stats.registerPrometheusMetrics(promClient)
t.teardown(() => {
promClient.register.clear()
})

{
const metrics = await promClient.register.metrics()

const lines = metrics.split('\n')
if (DEBUG) console.log(metrics)
t.is(getMetricValue(lines, 'hypercore_total_cores'), 0, 'init 0 (sanity check)')
}

stats.addCore(core)
{
const metrics = await promClient.register.metrics()

const lines = metrics.split('\n')
t.is(getMetricValue(lines, 'hypercore_total_cores'), 0, 'still cached 0 value')
}

await new Promise(resolve => setTimeout(resolve, 1000))
{
const metrics = await promClient.register.metrics()

const lines = metrics.split('\n')
t.is(getMetricValue(lines, 'hypercore_total_cores'), 1, 'cache busted after expire time')
}
})

0 comments on commit 48d16e1

Please sign in to comment.