Skip to content

Commit

Permalink
feat: add metrics to track queue sizes and add operations - #AES-361 (#…
Browse files Browse the repository at this point in the history
…3282)

* feat: add metrics to track queue sizes and add operations

* correct names

* fix: prettier

* fix: no separate metric for task queue needed

* trigger tests
  • Loading branch information
gvelez17 authored Oct 8, 2024
1 parent 8916c5b commit 60a7a9e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
1 change: 1 addition & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,4 @@ $ npm run publish:hotfix # create and publish the release
The main step here is `npm run publish:hotfix`, which creates a full release on NPM by bumping the patch version (e.g. `2.1.9` => `2.1.10`). It also creates a git commit with updated package versions as well as their corresponding git tags on github. Make sure to set the [GH_TOKEN](https://github.com/lerna/lerna/tree/master/commands/version#--create-release-type) environment variable and log into npm before you run this command.

After the release, don't forget to make a post in the #releases channel of the Ceramic discord to notify the community about the new release!

6 changes: 6 additions & 0 deletions packages/cli/src/s3-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import PQueue from 'p-queue'
import AWSSDK from 'aws-sdk'
import { Mutex } from 'await-semaphore'
import type { DeepNonNullable } from 'ts-essentials'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

/**
* **Remove** `undefined` fields from an S3 Level search params.
Expand All @@ -30,6 +31,9 @@ function definiteSearchParams<T extends Partial<StoreSearchParams>>(obj: T): Dee
*/
const MAX_LOAD_RPS = 4000

const LOAD_S3_QUEUE_ADD = 'load_s3_queue_add'
const LOAD_S3_QUEUE_SIZE = 'load_s3_queue_size'

export class S3KVFactory implements IKVFactory {
readonly #networkName: string
readonly #bucketName: string
Expand Down Expand Up @@ -185,6 +189,8 @@ class S3KVStore implements IKVStore {
}

get(key: string): Promise<any> {
Metrics.count(LOAD_S3_QUEUE_ADD, 1)
Metrics.observe(LOAD_S3_QUEUE_SIZE, this.#loadingLimit.size)
return this.#loadingLimit.add(async () => {
const value = await this.level.get(key)
return JSON.parse(value)
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/state-management/named-task-queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { noop, TaskQueue } from '../ancillary/task-queue.js'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

const NAMED_TASK_QUEUE_SIZE = 'named_task_queue_size'
const NAMED_TASK_QUEUE_RUN = 'named_task_queue_run'
const NAMED_TASK_QUEUE_ADD = 'named_task_queue_add'

/**
* Set of named PQueues.
Expand Down Expand Up @@ -49,6 +54,8 @@ export class NamedTaskQueue {
*/
run<A>(name: string, task: () => Promise<A>): Promise<A> {
const queue = this.queue(name)
Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, { name: name })
Metrics.count(NAMED_TASK_QUEUE_RUN, 1, { name: name })
return queue.run(task).finally(() => {
this.remove(name)
})
Expand All @@ -62,6 +69,8 @@ export class NamedTaskQueue {
*/
add(name: string, task: () => Promise<void>): void {
const queue = this.queue(name)
Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, { name: name })
Metrics.count(NAMED_TASK_QUEUE_ADD, 1, { name: name })
queue.add(
() => task(),
() => this.remove(name)
Expand Down
4 changes: 4 additions & 0 deletions packages/indexing/src/history-sync/workers/rebuild-anchor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import type { Worker, Job } from '@ceramicnetwork/job-queue'
import { CID } from 'multiformats/cid'
import { pathString } from '@ceramicnetwork/anchor-utils'
import PQueue from 'p-queue'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

// NOTE: In V' history sync will need to be reworked (ex. use CAR files, use recon)

// Up to 1024 streams could be present in an anchor
const IPFS_LOAD_CONCURRENCY = 16

const REBUILD_ANCHOR_QUEUE_SIZE = 'rebuild_anchor_queue_size'

const REBUILD_ANCHOR_JOB_OPTIONS: SendOptions = {
retryLimit: 5,
retryDelay: 60, // 1 minute
Expand Down Expand Up @@ -151,6 +154,7 @@ export class RebuildAnchorWorker implements Worker<RebuildAnchorJobData> {

const queue = new PQueue({ concurrency: IPFS_LOAD_CONCURRENCY })
await queue.addAll(tasks)
Metrics.observe(REBUILD_ANCHOR_QUEUE_SIZE, queue.size)

this.logger.debug(
`Rebuild anchor job completed for models ${jobData.models}, root ${jobData.root}, and txHash ${jobData.txHash}`
Expand Down

0 comments on commit 60a7a9e

Please sign in to comment.