diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index 1c80d8de60471..c34ac346acce0 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -1,6 +1,10 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; -import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import { + CLOUD_TEMP_PRODUCTION_LIMIT, + CLOUD_TEMP_REPORTABLE_THRESHOLDS, + ConcurrencyControlService, +} from '@/concurrency/concurrency-control.service'; import type { Logger } from '@/Logger'; import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error'; import { ConcurrencyQueue } from '../concurrency-queue'; @@ -366,4 +370,91 @@ describe('ConcurrencyControlService', () => { }); }); }); + + // ---------------------------------- + // telemetry + // ---------------------------------- + + describe('telemetry', () => { + describe('on cloud', () => { + test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS)( + 'for capacity %d, should report temp cloud threshold if reached', + (threshold) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + config.set('deployment.type', 'cloud'); + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Act + */ + // @ts-expect-error Private property + service.productionQueue.emit('concurrency-check', { + capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, + }); + + /** + * Assert + */ + expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold }); + }, + ); + + test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS.map((t) => t - 1))( + 'for capacity %d, should not report temp cloud threshold if not reached', + (threshold) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + config.set('deployment.type', 'cloud'); + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Act + */ + // @ts-expect-error Private property + service.productionQueue.emit('concurrency-check', { + capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, + }); + + /** + * Assert + */ + expect(telemetry.track).not.toHaveBeenCalledWith('User hit concurrency limit', { + threshold, + }); + }, + ); + + test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS.map((t) => t + 1))( + 'for capacity %d, should not report temp cloud threshold if exceeded', + (threshold) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + config.set('deployment.type', 'cloud'); + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Act + */ + // @ts-expect-error Private property + service.productionQueue.emit('concurrency-check', { + capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, + }); + + /** + * Assert + */ + expect(telemetry.track).not.toHaveBeenCalledWith('User hit concurrency limit', { + threshold, + }); + }, + ); + }); + }); }); diff --git a/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts index 3488e0b492ccb..9d192a4035142 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts @@ -1,3 +1,4 @@ +import { sleep } from 'n8n-workflow'; import { ConcurrencyQueue } from '../concurrency-queue'; describe('ConcurrencyQueue', () => { @@ -10,12 +11,12 @@ describe('ConcurrencyQueue', () => { const state: Record = {}; // eslint-disable-next-line @typescript-eslint/promise-function-async - const sleep = jest.fn(() => new Promise((resolve) => setTimeout(resolve, 500))); + const sleepSpy = jest.fn(() => sleep(500)); const testFn = async (item: { executionId: string }) => { await queue.enqueue(item.executionId); state[item.executionId] = 'started'; - await sleep(); + await sleepSpy(); queue.dequeue(); state[item.executionId] = 'finished'; }; @@ -29,33 +30,46 @@ describe('ConcurrencyQueue', () => { ]); // At T+0 seconds this method hasn't yielded to the event-loop, so no `testFn` calls are made - expect(sleep).toHaveBeenCalledTimes(0); + expect(sleepSpy).toHaveBeenCalledTimes(0); expect(state).toEqual({}); // At T+0.4 seconds the first `testFn` has been called, but hasn't resolved await jest.advanceTimersByTimeAsync(400); - expect(sleep).toHaveBeenCalledTimes(1); + expect(sleepSpy).toHaveBeenCalledTimes(1); expect(state).toEqual({ 1: 'started' }); // At T+0.5 seconds the first promise has resolved, and the second one has stared await jest.advanceTimersByTimeAsync(100); - expect(sleep).toHaveBeenCalledTimes(2); + expect(sleepSpy).toHaveBeenCalledTimes(2); expect(state).toEqual({ 1: 'finished', 2: 'started' }); // At T+1 seconds the first two promises have resolved, and the third one has stared await jest.advanceTimersByTimeAsync(500); - expect(sleep).toHaveBeenCalledTimes(3); + expect(sleepSpy).toHaveBeenCalledTimes(3); expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started' }); // If the fourth promise is removed, the fifth one is started in the next tick queue.remove('4'); await jest.advanceTimersByTimeAsync(1); - expect(sleep).toHaveBeenCalledTimes(4); + expect(sleepSpy).toHaveBeenCalledTimes(4); expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started', 5: 'started' }); // at T+5 seconds, all but the fourth promise should be resolved await jest.advanceTimersByTimeAsync(4000); - expect(sleep).toHaveBeenCalledTimes(4); + expect(sleepSpy).toHaveBeenCalledTimes(4); expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'finished', 5: 'finished' }); }); + + it('should debounce emitting of the `concurrency-check` event', async () => { + const queue = new ConcurrencyQueue(10); + const emitSpy = jest.fn(); + queue.on('concurrency-check', emitSpy); + + // eslint-disable-next-line @typescript-eslint/promise-function-async + Array.from({ length: 10 }, (_, i) => i).forEach(() => queue.enqueue('1')); + + expect(queue.currentCapacity).toBe(0); + await jest.advanceTimersByTimeAsync(1000); + expect(emitSpy).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 21429c00ed815..4f42a7b45c292 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -9,6 +9,9 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import type { IExecutingWorkflowData } from '@/Interfaces'; import { Telemetry } from '@/telemetry'; +export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; +export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; + @Service() export class ConcurrencyControlService { private readonly isEnabled: boolean; @@ -17,7 +20,9 @@ export class ConcurrencyControlService { private readonly productionQueue: ConcurrencyQueue; - private readonly limitsToReport = [5, 10, 20, 50, 100, 200]; + private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( + (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, + ); constructor( private readonly logger: Logger, @@ -46,19 +51,17 @@ export class ConcurrencyControlService { this.isEnabled = true; - this.productionQueue.on( - 'execution-throttled', - async ({ executionId, capacity }: { executionId: string; capacity: number }) => { - this.log('Execution throttled', { executionId }); - - /** - * Temporary until base data for cloud plans is collected. - */ - if (this.shouldReport(capacity)) { - await this.telemetry.track('User hit concurrency limit', { threshold: capacity }); - } - }, - ); + this.productionQueue.on('concurrency-check', ({ capacity }: { capacity: number }) => { + if (this.shouldReport(capacity)) { + void this.telemetry.track('User hit concurrency limit', { + threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, + }); + } + }); + + this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => { + this.log('Execution throttled', { executionId }); + }); this.productionQueue.on('execution-released', async (executionId: string) => { this.log('Execution released', { executionId }); diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 220d2a2363dab..571d1593e86b9 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -1,5 +1,6 @@ import { Service } from 'typedi'; import { EventEmitter } from 'node:events'; +import debounce from 'lodash/debounce'; @Service() export class ConcurrencyQueue extends EventEmitter { @@ -15,14 +16,20 @@ export class ConcurrencyQueue extends EventEmitter { async enqueue(executionId: string) { this.capacity--; + this.debouncedEmit('concurrency-check', { capacity: this.capacity }); + if (this.capacity < 0) { - this.emit('execution-throttled', { executionId, capacity: this.capacity }); + this.emit('execution-throttled', { executionId }); // eslint-disable-next-line @typescript-eslint/return-await return new Promise((resolve) => this.queue.push({ executionId, resolve })); } } + get currentCapacity() { + return this.capacity; + } + dequeue() { this.capacity++; @@ -56,4 +63,9 @@ export class ConcurrencyQueue extends EventEmitter { resolve(); } + + private debouncedEmit = debounce( + (event: string, payload: object) => this.emit(event, payload), + 300, + ); }