diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 4ca80dd94c..8f5badfaed 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -133,3 +133,4 @@ $ npm run publish:hotfix # create and publish the release The main step here is `npm run publish:hotfix`, which creates a full release on NPM by bumping the patch version (e.g. `2.1.9` => `2.1.10`). It also creates a git commit with updated package versions as well as their corresponding git tags on github. Make sure to set the [GH_TOKEN](https://github.com/lerna/lerna/tree/master/commands/version#--create-release-type) environment variable and log into npm before you run this command. After the release, don't forget to make a post in the #releases channel of the Ceramic discord to notify the community about the new release! + diff --git a/packages/cli/src/s3-store.ts b/packages/cli/src/s3-store.ts index e4e46daa14..f491a5f551 100644 --- a/packages/cli/src/s3-store.ts +++ b/packages/cli/src/s3-store.ts @@ -13,6 +13,7 @@ import PQueue from 'p-queue' import AWSSDK from 'aws-sdk' import { Mutex } from 'await-semaphore' import type { DeepNonNullable } from 'ts-essentials' +import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' /** * **Remove** `undefined` fields from an S3 Level search params. @@ -30,6 +31,9 @@ function definiteSearchParams>(obj: T): Dee */ const MAX_LOAD_RPS = 4000 +const LOAD_S3_QUEUE_ADD = 'load_s3_queue_add' +const LOAD_S3_QUEUE_SIZE = 'load_s3_queue_size' + export class S3KVFactory implements IKVFactory { readonly #networkName: string readonly #bucketName: string @@ -185,6 +189,8 @@ class S3KVStore implements IKVStore { } get(key: string): Promise { + Metrics.count(LOAD_S3_QUEUE_ADD, 1) + Metrics.observe(LOAD_S3_QUEUE_SIZE, this.#loadingLimit.size) return this.#loadingLimit.add(async () => { const value = await this.level.get(key) return JSON.parse(value) diff --git a/packages/core/src/state-management/named-task-queue.ts b/packages/core/src/state-management/named-task-queue.ts index 223bae996a..965a205506 100644 --- a/packages/core/src/state-management/named-task-queue.ts +++ b/packages/core/src/state-management/named-task-queue.ts @@ -1,4 +1,9 @@ import { noop, TaskQueue } from '../ancillary/task-queue.js' +import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' + +const NAMED_TASK_QUEUE_SIZE = 'named_task_queue_size' +const NAMED_TASK_QUEUE_RUN = 'named_task_queue_run' +const NAMED_TASK_QUEUE_ADD = 'named_task_queue_add' /** * Set of named PQueues. @@ -49,6 +54,8 @@ export class NamedTaskQueue { */ run(name: string, task: () => Promise): Promise { const queue = this.queue(name) + Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, { name: name }) + Metrics.count(NAMED_TASK_QUEUE_RUN, 1, { name: name }) return queue.run(task).finally(() => { this.remove(name) }) @@ -62,6 +69,8 @@ export class NamedTaskQueue { */ add(name: string, task: () => Promise): void { const queue = this.queue(name) + Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, { name: name }) + Metrics.count(NAMED_TASK_QUEUE_ADD, 1, { name: name }) queue.add( () => task(), () => this.remove(name) diff --git a/packages/indexing/src/history-sync/workers/rebuild-anchor.ts b/packages/indexing/src/history-sync/workers/rebuild-anchor.ts index 5095b2998a..98bff57f79 100644 --- a/packages/indexing/src/history-sync/workers/rebuild-anchor.ts +++ b/packages/indexing/src/history-sync/workers/rebuild-anchor.ts @@ -13,12 +13,15 @@ import type { Worker, Job } from '@ceramicnetwork/job-queue' import { CID } from 'multiformats/cid' import { pathString } from '@ceramicnetwork/anchor-utils' import PQueue from 'p-queue' +import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' // NOTE: In V' history sync will need to be reworked (ex. use CAR files, use recon) // Up to 1024 streams could be present in an anchor const IPFS_LOAD_CONCURRENCY = 16 +const REBUILD_ANCHOR_QUEUE_SIZE = 'rebuild_anchor_queue_size' + const REBUILD_ANCHOR_JOB_OPTIONS: SendOptions = { retryLimit: 5, retryDelay: 60, // 1 minute @@ -151,6 +154,7 @@ export class RebuildAnchorWorker implements Worker { const queue = new PQueue({ concurrency: IPFS_LOAD_CONCURRENCY }) await queue.addAll(tasks) + Metrics.observe(REBUILD_ANCHOR_QUEUE_SIZE, queue.size) this.logger.debug( `Rebuild anchor job completed for models ${jobData.models}, root ${jobData.root}, and txHash ${jobData.txHash}`