diff --git a/packages/kbn-babel-preset/node_preset.js b/packages/kbn-babel-preset/node_preset.js index 5ef4219df59f7..257863faa737f 100644 --- a/packages/kbn-babel-preset/node_preset.js +++ b/packages/kbn-babel-preset/node_preset.js @@ -18,6 +18,25 @@ */ module.exports = (_, options = {}) => { + const overrides = []; + if (!process.env.ALLOW_PERFORMANCE_HOOKS_IN_TASK_MANAGER) { + overrides.push( + { + test: [/x-pack[\/\\]legacy[\/\\]plugins[\/\\]task_manager/], + plugins: [ + [ + 'filter-imports', + { + imports: { + perf_hooks: ['performance'], + }, + }, + ], + ], + } + ); + } + return { presets: [ [ @@ -39,7 +58,7 @@ module.exports = (_, options = {}) => { modules: 'cjs', corejs: 3, - ...(options['@babel/preset-env'] || {}) + ...(options['@babel/preset-env'] || {}), }, ], require('./common_preset'), @@ -48,9 +67,10 @@ module.exports = (_, options = {}) => { [ require.resolve('babel-plugin-transform-define'), { - 'global.__BUILT_WITH_BABEL__': 'true' - } - ] - ] + 'global.__BUILT_WITH_BABEL__': 'true', + }, + ], + ], + overrides, }; }; diff --git a/packages/kbn-babel-preset/package.json b/packages/kbn-babel-preset/package.json index 4b18357745360..c22cf175b29e5 100644 --- a/packages/kbn-babel-preset/package.json +++ b/packages/kbn-babel-preset/package.json @@ -8,10 +8,11 @@ "@babel/plugin-syntax-dynamic-import": "^7.2.0", "@babel/plugin-transform-modules-commonjs": "^7.5.0", "@babel/preset-env": "^7.5.5", - "@babel/preset-react":"^7.0.0", + "@babel/preset-react": "^7.0.0", "@babel/preset-typescript": "^7.3.3", "@kbn/elastic-idx": "1.0.0", "babel-plugin-add-module-exports": "^1.0.2", + "babel-plugin-filter-imports": "^3.0.0", "babel-plugin-transform-define": "^1.3.1", "babel-plugin-typescript-strip-namespaces": "^1.1.1" } diff --git a/x-pack/.i18nrc.json b/x-pack/.i18nrc.json index 0ee50c0caa340..6d0da2f0b693d 100644 --- a/x-pack/.i18nrc.json +++ b/x-pack/.i18nrc.json @@ -34,6 +34,7 @@ "xpack.server": "legacy/server", "xpack.snapshotRestore": "legacy/plugins/snapshot_restore", "xpack.spaces": ["legacy/plugins/spaces", "plugins/spaces"], + "xpack.taskManager": "legacy/plugins/task_manager", "xpack.transform": "legacy/plugins/transform", "xpack.upgradeAssistant": "legacy/plugins/upgrade_assistant", "xpack.uptime": "legacy/plugins/uptime", diff --git a/x-pack/legacy/plugins/actions/server/builtin_action_types/lib/result_type.ts b/x-pack/legacy/plugins/actions/server/builtin_action_types/lib/result_type.ts index c891f3bf218e7..256463251315d 100644 --- a/x-pack/legacy/plugins/actions/server/builtin_action_types/lib/result_type.ts +++ b/x-pack/legacy/plugins/actions/server/builtin_action_types/lib/result_type.ts @@ -8,16 +8,15 @@ // Which is basically the Haskel equivalent of Rust/ML/Scala's Result // I'll reach out to other's in Kibana to see if we can merge these into one type -// eslint-disable-next-line @typescript-eslint/consistent-type-definitions -export type Ok = { +export interface Ok { tag: 'ok'; value: T; -}; -// eslint-disable-next-line @typescript-eslint/consistent-type-definitions -export type Err = { +} + +export interface Err { tag: 'err'; error: E; -}; +} export type Result = Ok | Err; export function asOk(value: T): Ok { diff --git a/x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts b/x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts index 860a637d58108..d7ac8d227fc4c 100644 --- a/x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts +++ b/x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts @@ -7,13 +7,14 @@ import _ from 'lodash'; import sinon from 'sinon'; import { fillPool } from './fill_pool'; +import { TaskPoolRunResult } from '../task_pool'; describe('fillPool', () => { test('stops filling when there are no more tasks in the store', async () => { const tasks = [[1, 2, 3], [4, 5]]; let index = 0; const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(async () => true); + const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks); const converter = _.identity; await fillPool(run, fetchAvailableTasks, converter); @@ -25,7 +26,7 @@ describe('fillPool', () => { const tasks = [[1, 2, 3], [4, 5]]; let index = 0; const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(async () => false); + const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = _.identity; await fillPool(run, fetchAvailableTasks, converter); @@ -37,7 +38,7 @@ describe('fillPool', () => { const tasks = [[1, 2, 3], [4, 5]]; let index = 0; const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(async () => false); + const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => x.toString(); await fillPool(run, fetchAvailableTasks, converter); @@ -47,7 +48,7 @@ describe('fillPool', () => { describe('error handling', () => { test('throws exception from fetchAvailableTasks', async () => { - const run = sinon.spy(async () => false); + const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => x.toString(); try { @@ -80,7 +81,7 @@ describe('fillPool', () => { const tasks = [[1, 2, 3], [4, 5]]; let index = 0; const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(async () => false); + const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => { throw new Error(`can not convert ${x}`); }; diff --git a/x-pack/legacy/plugins/task_manager/lib/fill_pool.ts b/x-pack/legacy/plugins/task_manager/lib/fill_pool.ts index a5970574abaaf..6fe965e048ea5 100644 --- a/x-pack/legacy/plugins/task_manager/lib/fill_pool.ts +++ b/x-pack/legacy/plugins/task_manager/lib/fill_pool.ts @@ -4,7 +4,15 @@ * you may not use this file except in compliance with the Elastic License. */ -type BatchRun = (tasks: T[]) => Promise; +import { performance } from 'perf_hooks'; +import { TaskPoolRunResult } from '../task_pool'; + +export enum FillPoolResult { + NoTasksClaimed = 'NoTasksClaimed', + RanOutOfCapacity = 'RanOutOfCapacity', +} + +type BatchRun = (tasks: T[]) => Promise; type Fetcher = () => Promise; type Converter = (t: T1) => T2; @@ -24,18 +32,32 @@ export async function fillPool( run: BatchRun, fetchAvailableTasks: Fetcher, converter: Converter -): Promise { +): Promise { + performance.mark('fillPool.start'); while (true) { const instances = await fetchAvailableTasks(); if (!instances.length) { - return; + performance.mark('fillPool.bailNoTasks'); + performance.measure( + 'fillPool.activityDurationUntilNoTasks', + 'fillPool.start', + 'fillPool.bailNoTasks' + ); + return FillPoolResult.NoTasksClaimed; } const tasks = instances.map(converter); - if (!(await run(tasks))) { - return; + if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) { + performance.mark('fillPool.bailExhaustedCapacity'); + performance.measure( + 'fillPool.activityDurationUntilExhaustedCapacity', + 'fillPool.start', + 'fillPool.bailExhaustedCapacity' + ); + return FillPoolResult.RanOutOfCapacity; } + performance.mark('fillPool.cycle'); } } diff --git a/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts b/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts index 07afee1797462..699765f16d83e 100644 --- a/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts +++ b/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts @@ -70,6 +70,7 @@ describe('addMiddlewareToChain', () => { return opts; }, beforeRun: defaultBeforeRun, + beforeMarkRunning: defaultBeforeRun, }; const m2 = { beforeSave: async (opts: BeforeSaveOpts) => { @@ -77,6 +78,7 @@ describe('addMiddlewareToChain', () => { return opts; }, beforeRun: defaultBeforeRun, + beforeMarkRunning: defaultBeforeRun, }; const m3 = { beforeSave: async (opts: BeforeSaveOpts) => { @@ -84,6 +86,7 @@ describe('addMiddlewareToChain', () => { return opts; }, beforeRun: defaultBeforeRun, + beforeMarkRunning: defaultBeforeRun, }; let middlewareChain; @@ -119,6 +122,7 @@ describe('addMiddlewareToChain', () => { m1: true, }; }, + beforeMarkRunning: defaultBeforeRun, }; const m2 = { beforeSave: defaultBeforeSave, @@ -128,6 +132,7 @@ describe('addMiddlewareToChain', () => { m2: true, }; }, + beforeMarkRunning: defaultBeforeRun, }; const m3 = { beforeSave: defaultBeforeSave, @@ -137,6 +142,7 @@ describe('addMiddlewareToChain', () => { m3: true, }; }, + beforeMarkRunning: defaultBeforeRun, }; let middlewareChain; diff --git a/x-pack/legacy/plugins/task_manager/lib/middleware.ts b/x-pack/legacy/plugins/task_manager/lib/middleware.ts index d81b76fda7e50..d367c8ca56c09 100644 --- a/x-pack/legacy/plugins/task_manager/lib/middleware.ts +++ b/x-pack/legacy/plugins/task_manager/lib/middleware.ts @@ -23,10 +23,12 @@ export type BeforeSaveFunction = ( ) => Promise; export type BeforeRunFunction = (params: RunContext) => Promise; +export type BeforeMarkRunningFunction = (params: RunContext) => Promise; export interface Middleware { beforeSave: BeforeSaveFunction; beforeRun: BeforeRunFunction; + beforeMarkRunning: BeforeMarkRunningFunction; } export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) { @@ -39,8 +41,14 @@ export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Mid ? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun) : prevMiddleware.beforeRun; + const beforeMarkRunning = middleware.beforeMarkRunning + ? (params: RunContext) => + middleware.beforeMarkRunning(params).then(prevMiddleware.beforeMarkRunning) + : prevMiddleware.beforeMarkRunning; + return { beforeSave, beforeRun, + beforeMarkRunning, }; } diff --git a/x-pack/legacy/plugins/task_manager/task_manager.test.ts b/x-pack/legacy/plugins/task_manager/task_manager.test.ts index 3961dcafffdca..9ae2f5e1e027b 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.test.ts @@ -174,6 +174,7 @@ describe('TaskManager', () => { const middleware = { beforeSave: async (saveOpts: any) => saveOpts, beforeRun: async (runOpts: any) => runOpts, + beforeMarkRunning: async (runOpts: any) => runOpts, }; expect(() => client.addMiddleware(middleware)).not.toThrow(); }); @@ -183,6 +184,7 @@ describe('TaskManager', () => { const middleware = { beforeSave: async (saveOpts: any) => saveOpts, beforeRun: async (runOpts: any) => runOpts, + beforeMarkRunning: async (runOpts: any) => runOpts, }; client.start(); @@ -241,7 +243,10 @@ describe('TaskManager', () => { claimAvailableTasks(claim, 10, logger); - sinon.assert.calledWithMatch(logger.warn, /inline scripts/); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot( + `"Task Manager cannot operate when inline scripts are disabled in Elasticsearch"` + ); }); }); }); diff --git a/x-pack/legacy/plugins/task_manager/task_manager.ts b/x-pack/legacy/plugins/task_manager/task_manager.ts index 219c525aea23f..4ddb18c7cfe74 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.ts @@ -3,10 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ - +import { performance } from 'perf_hooks'; import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server'; import { Logger } from './types'; -import { fillPool } from './lib/fill_pool'; +import { fillPool, FillPoolResult } from './lib/fill_pool'; import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions'; import { intervalFromNow } from './lib/intervals'; @@ -56,13 +56,14 @@ export class TaskManager { private readonly pollerInterval: number; private definitions: TaskDictionary; private store: TaskStore; - private poller: TaskPoller; + private poller: TaskPoller; private logger: Logger; private pool: TaskPool; private startQueue: Array<() => void> = []; private middleware = { beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts, beforeRun: async (runOpts: RunContext) => runOpts, + beforeMarkRunning: async (runOpts: RunContext) => runOpts, }; /** @@ -86,8 +87,6 @@ export class TaskManager { this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`); } - /* Kibana UUID needs to be pulled live (not cached), as it takes a long time - * to initialize, and can change after startup */ const store = new TaskStore({ serializer: opts.serializer, savedObjectsRepository: opts.savedObjectsRepository, @@ -109,13 +108,14 @@ export class TaskManager { store, definitions: this.definitions, beforeRun: this.middleware.beforeRun, + beforeMarkRunning: this.middleware.beforeMarkRunning, }); - const poller = new TaskPoller({ + const poller = new TaskPoller({ logger: this.logger, pollInterval: opts.config.get('xpack.task_manager.poll_interval'), - work: (): Promise => + work: (): Promise => fillPool( - pool.run, + async tasks => await pool.run(tasks), () => claimAvailableTasks( this.store.claimAvailableTasks.bind(this.store), @@ -260,12 +260,24 @@ export async function claimAvailableTasks( logger: Logger ) { if (availableWorkers > 0) { + performance.mark('claimAvailableTasks_start'); + try { const { docs, claimedTasks } = await claim({ size: availableWorkers, claimOwnershipUntil: intervalFromNow('30s')!, }); + if (claimedTasks === 0) { + performance.mark('claimAvailableTasks.noTasks'); + } + performance.mark('claimAvailableTasks_stop'); + performance.measure( + 'claimAvailableTasks', + 'claimAvailableTasks_start', + 'claimAvailableTasks_stop' + ); + if (docs.length !== claimedTasks) { logger.warn( `[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched` @@ -282,6 +294,7 @@ export async function claimAvailableTasks( } } } else { + performance.mark('claimAvailableTasks.noAvailableWorkers'); logger.info( `[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers. If this happens often, consider adjusting the "xpack.task_manager.max_workers" configuration.` ); diff --git a/x-pack/legacy/plugins/task_manager/task_poller.test.ts b/x-pack/legacy/plugins/task_manager/task_poller.test.ts index 478c1a4dc1b17..88bcf29ec6084 100644 --- a/x-pack/legacy/plugins/task_manager/task_poller.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_poller.test.ts @@ -73,7 +73,9 @@ describe('TaskPoller', () => { await doneWorking; expect(count).toEqual(2); - sinon.assert.calledWithMatch(logger.error, /Dang it/i); + expect(logger.error.mock.calls[0][0]).toMatchInlineSnapshot( + `"Failed to poll for work: Error: Dang it!"` + ); }); test('is stoppable', async () => { diff --git a/x-pack/legacy/plugins/task_manager/task_poller.ts b/x-pack/legacy/plugins/task_manager/task_poller.ts index ae023f96a4064..0f7b49f17872a 100644 --- a/x-pack/legacy/plugins/task_manager/task_poller.ts +++ b/x-pack/legacy/plugins/task_manager/task_poller.ts @@ -8,27 +8,28 @@ * This module contains the logic for polling the task manager index for new work. */ +import { performance } from 'perf_hooks'; import { Logger } from './types'; -type WorkFn = () => Promise; +type WorkFn = () => Promise; -interface Opts { +interface Opts { pollInterval: number; logger: Logger; - work: WorkFn; + work: WorkFn; } /** * Performs work on a scheduled interval, logging any errors. This waits for work to complete * (or error) prior to attempting another run. */ -export class TaskPoller { +export class TaskPoller { private isStarted = false; private isWorking = false; private timeout: any; private pollInterval: number; private logger: Logger; - private work: WorkFn; + private work: WorkFn; /** * Constructs a new TaskPoller. @@ -38,7 +39,7 @@ export class TaskPoller { * @prop {Logger} logger - The task manager logger * @prop {WorkFn} work - An empty, asynchronous function that performs the desired work */ - constructor(opts: Opts) { + constructor(opts: Opts) { this.pollInterval = opts.pollInterval; this.logger = opts.logger; this.work = opts.work; @@ -57,8 +58,16 @@ export class TaskPoller { const poll = async () => { await this.attemptWork(); + performance.mark('TaskPoller.sleep'); if (this.isStarted) { - this.timeout = setTimeout(poll, this.pollInterval); + this.timeout = setTimeout( + tryAndLogOnError(() => { + performance.mark('TaskPoller.poll'); + performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll'); + poll(); + }, this.logger), + this.pollInterval + ); } }; @@ -94,3 +103,13 @@ export class TaskPoller { } } } + +function tryAndLogOnError(fn: Function, logger: Logger): Function { + return () => { + try { + fn(); + } catch (err) { + logger.error(`Task Poller polling phase failed: ${err}`); + } + }; +} diff --git a/x-pack/legacy/plugins/task_manager/task_pool.test.ts b/x-pack/legacy/plugins/task_manager/task_pool.test.ts index e6a83dd1911bd..4967f4383294f 100644 --- a/x-pack/legacy/plugins/task_manager/task_pool.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_pool.test.ts @@ -5,7 +5,7 @@ */ import sinon from 'sinon'; -import { TaskPool } from './task_pool'; +import { TaskPool, TaskPoolRunResult } from './task_pool'; import { mockLogger, resolvable, sleep } from './test_utils'; describe('TaskPool', () => { @@ -17,7 +17,7 @@ describe('TaskPool', () => { const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]); - expect(result).toBeTruthy(); + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); expect(pool.occupiedWorkers).toEqual(3); }); @@ -29,7 +29,7 @@ describe('TaskPool', () => { const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]); - expect(result).toBeTruthy(); + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); expect(pool.availableWorkers).toEqual(7); }); @@ -48,10 +48,74 @@ describe('TaskPool', () => { { ...mockTask(), run: shouldNotRun }, ]); - expect(result).toBeFalsy(); + expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity); expect(pool.availableWorkers).toEqual(0); - sinon.assert.calledTwice(shouldRun); - sinon.assert.notCalled(shouldNotRun); + expect(shouldRun).toHaveBeenCalledTimes(2); + expect(shouldNotRun).not.toHaveBeenCalled(); + }); + + test('should log when marking a Task as running fails', async () => { + const logger = mockLogger(); + const pool = new TaskPool({ + maxWorkers: 2, + logger, + }); + + const taskFailedToMarkAsRunning = mockTask(); + taskFailedToMarkAsRunning.markTaskAsRunning.mockImplementation(async () => { + throw new Error(`Mark Task as running has failed miserably`); + }); + + const result = await pool.run([mockTask(), taskFailedToMarkAsRunning, mockTask()]); + + expect(logger.error.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "Failed to mark Task TaskType \\"shooooo\\" as running: Mark Task as running has failed miserably", + ] + `); + + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); + }); + + test('should log when running a Task fails', async () => { + const logger = mockLogger(); + const pool = new TaskPool({ + maxWorkers: 3, + logger, + }); + + const taskFailedToRun = mockTask(); + taskFailedToRun.run.mockImplementation(async () => { + throw new Error(`Run Task has failed miserably`); + }); + + const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]); + + expect(logger.warn.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "Task TaskType \\"shooooo\\" failed in attempt to run: Run Task has failed miserably", + ] + `); + + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); + }); + + test('Running a task which fails still takes up capacity', async () => { + const logger = mockLogger(); + const pool = new TaskPool({ + maxWorkers: 1, + logger, + }); + + const taskFailedToRun = mockTask(); + taskFailedToRun.run.mockImplementation(async () => { + await sleep(0); + throw new Error(`Run Task has failed miserably`); + }); + + const result = await pool.run([taskFailedToRun, mockTask()]); + + expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity); }); test('clears up capacity when a task completes', async () => { @@ -78,7 +142,7 @@ describe('TaskPool', () => { { ...mockTask(), run: secondRun }, ]); - expect(result).toBeFalsy(); + expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity); expect(pool.occupiedWorkers).toEqual(1); expect(pool.availableWorkers).toEqual(0); @@ -133,7 +197,7 @@ describe('TaskPool', () => { }, ]); - expect(result).toBeTruthy(); + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); expect(pool.occupiedWorkers).toEqual(2); expect(pool.availableWorkers).toEqual(0); @@ -173,7 +237,7 @@ describe('TaskPool', () => { }, ]); - expect(result).toBeTruthy(); + expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); await pool.run([]); expect(pool.occupiedWorkers).toEqual(0); @@ -181,11 +245,13 @@ describe('TaskPool', () => { // Allow the task to cancel... await cancelled; - sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/); + expect(logger.error.mock.calls[0][0]).toMatchInlineSnapshot( + `"Failed to cancel task \\"shooooo!\\": Error: Dern!"` + ); }); function mockRun() { - return sinon.spy(async () => { + return jest.fn(async () => { await sleep(0); return { state: {} }; }); @@ -195,8 +261,9 @@ describe('TaskPool', () => { return { isExpired: false, cancel: async () => undefined, - markTaskAsRunning: async () => true, + markTaskAsRunning: jest.fn(async () => true), run: mockRun(), + toString: () => `TaskType "shooooo"`, }; } }); diff --git a/x-pack/legacy/plugins/task_manager/task_pool.ts b/x-pack/legacy/plugins/task_manager/task_pool.ts index 7afbec65a0d8c..5828cb0df4a4d 100644 --- a/x-pack/legacy/plugins/task_manager/task_pool.ts +++ b/x-pack/legacy/plugins/task_manager/task_pool.ts @@ -8,7 +8,7 @@ * This module contains the logic that ensures we don't run too many * tasks at once in a given Kibana instance. */ - +import { performance } from 'perf_hooks'; import { Logger } from './types'; import { TaskRunner } from './task_runner'; @@ -17,6 +17,13 @@ interface Opts { logger: Logger; } +export enum TaskPoolRunResult { + RunningAllClaimedTasks = 'RunningAllClaimedTasks', + RanOutOfCapacity = 'RanOutOfCapacity', +} + +const VERSION_CONFLICT_MESSAGE = 'Task has been claimed by another Kibana service'; + /** * Runs tasks in batches, taking costs into account. */ @@ -66,38 +73,64 @@ export class TaskPool { }; public cancelRunningTasks() { - this.logger.debug(`Cancelling running tasks.`); + this.logger.debug('Cancelling running tasks.'); for (const task of this.running) { this.cancelTask(task); } } - private async attemptToRun(tasks: TaskRunner[]) { - for (const task of tasks) { - if (this.availableWorkers > 0) { - if (await task.markTaskAsRunning()) { - this.running.add(task); - task - .run() - .catch(err => { - this.logger.warn(`Task ${task} failed in attempt to run: ${err.message}`); - }) - .then(() => this.running.delete(task)); - } else { - this.logger.warn(`Failed to mark Task ${task} as running`); - } - } else { - return false; + private async attemptToRun(tasks: TaskRunner[]): Promise { + const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers); + if (tasksToRun.length) { + performance.mark('attemptToRun_start'); + await Promise.all( + tasksToRun.map( + async task => + await task + .markTaskAsRunning() + .then((hasTaskBeenMarkAsRunning: boolean) => + hasTaskBeenMarkAsRunning + ? this.handleMarkAsRunning(task) + : this.handleFailureOfMarkAsRunning(task, { + name: 'TaskPoolVersionConflictError', + message: VERSION_CONFLICT_MESSAGE, + }) + ) + .catch(ex => this.handleFailureOfMarkAsRunning(task, ex)) + ) + ); + + performance.mark('attemptToRun_stop'); + performance.measure('taskPool.attemptToRun', 'attemptToRun_start', 'attemptToRun_stop'); + } + + if (leftOverTasks.length) { + if (this.availableWorkers) { + return this.attemptToRun(leftOverTasks); } + return TaskPoolRunResult.RanOutOfCapacity; } + return TaskPoolRunResult.RunningAllClaimedTasks; + } + + private handleMarkAsRunning(task: TaskRunner) { + this.running.add(task); + task + .run() + .catch(err => { + this.logger.warn(`Task ${task.toString()} failed in attempt to run: ${err.message}`); + }) + .then(() => this.running.delete(task)); + } - return true; + private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) { + this.logger.error(`Failed to mark Task ${task.toString()} as running: ${err.message}`); } private cancelExpiredTasks() { for (const task of this.running) { if (task.isExpired) { - this.logger.debug(`Cancelling expired task ${task}.`); + this.logger.debug(`Cancelling expired task ${task.toString()}.`); this.cancelTask(task); } } @@ -105,11 +138,16 @@ export class TaskPool { private async cancelTask(task: TaskRunner) { try { - this.logger.debug(`Cancelling task ${task}.`); + this.logger.debug(`Cancelling task ${task.toString()}.`); this.running.delete(task); await task.cancel(); } catch (err) { - this.logger.error(`Failed to cancel task ${task}: ${err}`); + this.logger.error(`Failed to cancel task ${task.toString()}: ${err}`); } } } + +function partitionListByCount(list: T[], count: number): [T[], T[]] { + const listInCount = list.splice(0, count); + return [listInCount, list]; +} diff --git a/x-pack/legacy/plugins/task_manager/task_runner.test.ts b/x-pack/legacy/plugins/task_manager/task_runner.test.ts index 49c55279eafe7..578b86ba0b3f6 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.test.ts @@ -10,6 +10,7 @@ import { minutesFromNow } from './lib/intervals'; import { ConcreteTaskInstance } from './task'; import { TaskManagerRunner } from './task_runner'; import { mockLogger } from './test_utils'; +import { SavedObjectsErrorHelpers } from '../../../../src/core/server/saved_objects/service/lib/errors'; let fakeTimer: sinon.SinonFakeTimers; @@ -202,7 +203,7 @@ describe('TaskManagerRunner', () => { await promise; expect(wasCancelled).toBeTruthy(); - sinon.assert.neverCalledWithMatch(logger.warn, /not cancellable/); + expect(logger.warn).not.toHaveBeenCalled(); }); test('warns if cancel is called on a non-cancellable task', async () => { @@ -220,7 +221,10 @@ describe('TaskManagerRunner', () => { await runner.cancel(); await promise; - sinon.assert.calledWithMatch(logger.warn, /not cancellable/); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot( + `"The task bar \\"foo\\" is not cancellable."` + ); }); test('sets startedAt, status, attempts and retryAt when claiming a task', async () => { @@ -420,6 +424,74 @@ describe('TaskManagerRunner', () => { ); }); + test('it returns false when markTaskAsRunning fails due to VERSION_CONFLICT_STATUS', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(1, 3); + const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000); + const timeoutMinutes = 1; + const getRetryStub = sinon.stub().returns(nextRetry); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetry: getRetryStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + store.update = sinon + .stub() + .throws( + SavedObjectsErrorHelpers.decorateConflictError(new Error('repo error')).output.payload + ); + + expect(await runner.markTaskAsRunning()).toEqual(false); + }); + + test('it throw when markTaskAsRunning fails for unexpected reasons', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(1, 3); + const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000); + const timeoutMinutes = 1; + const getRetryStub = sinon.stub().returns(nextRetry); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetry: getRetryStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + store.update = sinon + .stub() + .throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id').output.payload); + + return expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(` + Object { + "error": "Not Found", + "message": "Saved object [type/id] not found", + "statusCode": 404, + } + `); + }); + test('uses getRetry (returning true) to set retryAt when defined', async () => { const id = _.random(1, 20).toString(); const initialAttempts = _.random(1, 3); @@ -601,6 +673,7 @@ describe('TaskManagerRunner', () => { }; const runner = new TaskManagerRunner({ beforeRun: context => Promise.resolve(context), + beforeMarkRunning: context => Promise.resolve(context), logger, store, instance: Object.assign( @@ -655,9 +728,10 @@ describe('TaskManagerRunner', () => { await runner.run(); if (shouldBeValid) { - sinon.assert.notCalled(logger.warn); + expect(logger.warn).not.toHaveBeenCalled(); } else { - sinon.assert.calledWith(logger.warn, sinon.match(/invalid task result/i)); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatch(/invalid task result/i); } } diff --git a/x-pack/legacy/plugins/task_manager/task_runner.ts b/x-pack/legacy/plugins/task_manager/task_runner.ts index a51e0c885b978..9d1431ed004e3 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.ts @@ -10,10 +10,11 @@ * rescheduling, middleware application, etc. */ +import { performance } from 'perf_hooks'; import Joi from 'joi'; import { intervalFromDate, intervalFromNow } from './lib/intervals'; import { Logger } from './types'; -import { BeforeRunFunction } from './lib/middleware'; +import { BeforeRunFunction, BeforeMarkRunningFunction } from './lib/middleware'; import { CancelFunction, CancellableTask, @@ -32,7 +33,7 @@ export interface TaskRunner { cancel: CancelFunction; markTaskAsRunning: () => Promise; run: () => Promise; - toString?: () => string; + toString: () => string; } interface Updatable { @@ -47,6 +48,7 @@ interface Opts { instance: ConcreteTaskInstance; store: Updatable; beforeRun: BeforeRunFunction; + beforeMarkRunning: BeforeMarkRunningFunction; } /** @@ -62,8 +64,9 @@ export class TaskManagerRunner implements TaskRunner { private instance: ConcreteTaskInstance; private definitions: TaskDictionary; private logger: Logger; - private store: Updatable; + private bufferedTaskStore: Updatable; private beforeRun: BeforeRunFunction; + private beforeMarkRunning: BeforeMarkRunningFunction; /** * Creates an instance of TaskManagerRunner. @@ -79,8 +82,9 @@ export class TaskManagerRunner implements TaskRunner { this.instance = sanitizeInstance(opts.instance); this.definitions = opts.definitions; this.logger = opts.logger; - this.store = opts.store; + this.bufferedTaskStore = opts.store; this.beforeRun = opts.beforeRun; + this.beforeMarkRunning = opts.beforeMarkRunning; } /** @@ -153,14 +157,20 @@ export class TaskManagerRunner implements TaskRunner { * @returns {Promise} */ public async markTaskAsRunning(): Promise { + performance.mark('markTaskAsRunning_start'); + const VERSION_CONFLICT_STATUS = 409; - const attempts = this.instance.attempts + 1; const now = new Date(); - const ownershipClaimedUntil = this.instance.retryAt; + const { taskInstance } = await this.beforeMarkRunning({ + taskInstance: this.instance, + }); + + const attempts = taskInstance.attempts + 1; + const ownershipClaimedUntil = taskInstance.retryAt; try { - const { id } = this.instance; + const { id } = taskInstance; const timeUntilClaimExpires = howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil); if (timeUntilClaimExpires < 0) { @@ -171,8 +181,8 @@ export class TaskManagerRunner implements TaskRunner { ); } - this.instance = await this.store.update({ - ...this.instance, + this.instance = await this.bufferedTaskStore.update({ + ...taskInstance, status: 'running', startedAt: now, attempts, @@ -198,13 +208,14 @@ export class TaskManagerRunner implements TaskRunner { ); } + performanceStopMarkingTaskAsRunning(); return true; } catch (error) { + performanceStopMarkingTaskAsRunning(); if (error.statusCode !== VERSION_CONFLICT_STATUS) { throw error; } } - return false; } @@ -263,7 +274,7 @@ export class TaskManagerRunner implements TaskRunner { runAt = intervalFromDate(startedAt, this.instance.interval)!; } - await this.store.update({ + await this.bufferedTaskStore.update({ ...this.instance, runAt, state, @@ -280,7 +291,7 @@ export class TaskManagerRunner implements TaskRunner { private async processResultWhenDone(result: RunResult): Promise { // not a recurring task: clean up by removing the task instance from store try { - await this.store.remove(this.instance.id); + await this.bufferedTaskStore.remove(this.instance.id); } catch (err) { if (err.statusCode === 404) { this.logger.warn(`Task cleanup of ${this} failed in processing. Was remove called twice?`); @@ -306,7 +317,7 @@ export class TaskManagerRunner implements TaskRunner { return 'idle'; } - const maxAttempts = this.definition.maxAttempts || this.store.maxAttempts; + const maxAttempts = this.definition.maxAttempts || this.bufferedTaskStore.maxAttempts; return this.instance.attempts < maxAttempts ? 'idle' : 'failed'; } @@ -352,3 +363,12 @@ function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null): number { return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0; } + +function performanceStopMarkingTaskAsRunning() { + performance.mark('markTaskAsRunning_stop'); + performance.measure( + 'taskRunner.markTaskAsRunning', + 'markTaskAsRunning_start', + 'markTaskAsRunning_stop' + ); +} diff --git a/x-pack/legacy/plugins/task_manager/task_store.test.ts b/x-pack/legacy/plugins/task_manager/task_store.test.ts index 9779dc5efd28b..46efc4bb57ba7 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.test.ts @@ -9,7 +9,6 @@ import sinon from 'sinon'; import uuid from 'uuid'; import { TaskDictionary, TaskDefinition, TaskInstance, TaskStatus } from './task'; import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store'; -import { mockLogger } from './test_utils'; import { savedObjectsClientMock } from 'src/core/server/mocks'; import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server'; @@ -77,6 +76,7 @@ describe('TaskStore', () => { test('serializes the params and state', async () => { const task = { + id: 'id', params: { hello: 'world' }, state: { foo: 'bar' }, taskType: 'report', @@ -99,7 +99,10 @@ describe('TaskStore', () => { taskType: 'report', user: undefined, }, - {} + { + id: 'id', + refresh: false, + } ); expect(result).toEqual({ @@ -326,233 +329,6 @@ describe('TaskStore', () => { }); }); - describe('fetchAvailableTasks', () => { - async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) { - const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits } })); - const store = new TaskStore({ - callCluster, - logger: mockLogger(), - definitions: taskDefinitions, - maxAttempts: 2, - serializer, - ...opts, - }); - - const result = await store.fetchAvailableTasks(); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'search'); - - return { - result, - args: callCluster.args[0][1], - }; - } - - test('it returns normally with no tasks when the index does not exist.', async () => { - const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits: [] } })); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster, - definitions: taskDefinitions, - maxAttempts: 2, - savedObjectsRepository: savedObjectsClient, - }); - - const result = await store.fetchAvailableTasks(); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWithMatch(callCluster, 'search', { ignoreUnavailable: true }); - - expect(result.length).toBe(0); - }); - - test('it filters tasks by supported types, maxAttempts, and runAt', async () => { - const maxAttempts = _.random(2, 43); - const customMaxAttempts = _.random(44, 100); - const { args } = await testFetchAvailableTasks({ - opts: { - maxAttempts, - definitions: { - foo: { - type: 'foo', - title: '', - createTaskRunner: jest.fn(), - }, - bar: { - type: 'bar', - title: '', - maxAttempts: customMaxAttempts, - createTaskRunner: jest.fn(), - }, - }, - }, - }); - expect(args).toMatchObject({ - body: { - query: { - bool: { - must: [ - { term: { type: 'task' } }, - { - bool: { - must: [ - { - bool: { - should: [ - { - bool: { - must: [ - { term: { 'task.status': 'idle' } }, - { range: { 'task.runAt': { lte: 'now' } } }, - ], - }, - }, - { - bool: { - must: [ - { term: { 'task.status': 'running' } }, - { range: { 'task.retryAt': { lte: 'now' } } }, - ], - }, - }, - ], - }, - }, - { - bool: { - should: [ - { exists: { field: 'task.interval' } }, - { - bool: { - must: [ - { term: { 'task.taskType': 'foo' } }, - { - range: { - 'task.attempts': { - lt: maxAttempts, - }, - }, - }, - ], - }, - }, - { - bool: { - must: [ - { term: { 'task.taskType': 'bar' } }, - { - range: { - 'task.attempts': { - lt: customMaxAttempts, - }, - }, - }, - ], - }, - }, - ], - }, - }, - ], - }, - }, - ], - }, - }, - size: 10, - sort: { - _script: { - type: 'number', - order: 'asc', - script: { - lang: 'expression', - source: `doc['task.retryAt'].value || doc['task.runAt'].value`, - }, - }, - }, - seq_no_primary_term: true, - }, - }); - }); - - test('it returns task objects', async () => { - const runAt = new Date(); - const { result } = await testFetchAvailableTasks({ - hits: [ - { - _id: 'aaa', - _source: { - type: 'task', - task: { - runAt, - taskType: 'foo', - interval: undefined, - attempts: 0, - status: 'idle', - params: '{ "hello": "world" }', - state: '{ "baby": "Henhen" }', - user: 'jimbo', - scope: ['reporting'], - }, - }, - _seq_no: 1, - _primary_term: 2, - sort: ['a', 1], - }, - { - _id: 'bbb', - _source: { - type: 'task', - task: { - runAt, - taskType: 'bar', - interval: '5m', - attempts: 2, - status: 'running', - params: '{ "shazm": 1 }', - state: '{ "henry": "The 8th" }', - user: 'dabo', - scope: ['reporting', 'ceo'], - }, - }, - _seq_no: 3, - _primary_term: 4, - sort: ['b', 2], - }, - ], - }); - expect(result).toMatchObject([ - { - attempts: 0, - id: 'aaa', - interval: undefined, - params: { hello: 'world' }, - runAt, - scope: ['reporting'], - state: { baby: 'Henhen' }, - status: 'idle', - taskType: 'foo', - user: 'jimbo', - }, - { - attempts: 2, - id: 'bbb', - interval: '5m', - params: { shazm: 1 }, - runAt, - scope: ['reporting', 'ceo'], - state: { henry: 'The 8th' }, - status: 'running', - taskType: 'bar', - user: 'dabo', - }, - ]); - }); - }); - describe('claimAvailableTasks', () => { async function testClaimAvailableTasks({ opts = {}, @@ -927,7 +703,7 @@ describe('TaskStore', () => { user: undefined, ownerId: null, }, - { version: '123' } + { version: '123', refresh: false } ); expect(result).toEqual({ diff --git a/x-pack/legacy/plugins/task_manager/task_store.ts b/x-pack/legacy/plugins/task_manager/task_store.ts index 9b5253d8be7f9..58bffd2269eb6 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.ts @@ -70,6 +70,11 @@ export interface ClaimOwnershipResult { docs: ConcreteTaskInstance[]; } +export interface BulkUpdateTaskFailureResult { + error: NonNullable; + task: ConcreteTaskInstance; +} + export interface UpdateByQueryResult { updated: number; version_conflicts: number; @@ -107,8 +112,6 @@ export class TaskStore { this.definitions = opts.definitions; this.serializer = opts.serializer; this.savedObjectsRepository = opts.savedObjectsRepository; - - this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this); } /** @@ -128,7 +131,7 @@ export class TaskStore { const savedObject = await this.savedObjectsRepository.create( 'task', taskInstanceToAttributes(taskInstance), - { id: taskInstance.id } + { id: taskInstance.id, refresh: false } ); return savedObjectToConcreteTaskInstance(savedObject); @@ -148,88 +151,6 @@ export class TaskStore { }); } - /** - * Fetches tasks from the index, which are ready to be run. - * - runAt is now or past - * - id is not currently running in this instance of Kibana - * - has a type that is in our task definitions - * - * @param {TaskQuery} query - * @prop {string[]} types - Task types to be queried - * @prop {number} size - The number of task instances to retrieve - * @returns {Promise} - */ - public async fetchAvailableTasks(): Promise { - const { docs } = await this.search({ - query: { - bool: { - must: [ - // Either a task with idle status and runAt <= now or - // status running with a retryAt <= now. - { - bool: { - should: [ - { - bool: { - must: [ - { term: { 'task.status': 'idle' } }, - { range: { 'task.runAt': { lte: 'now' } } }, - ], - }, - }, - { - bool: { - must: [ - { term: { 'task.status': 'running' } }, - { range: { 'task.retryAt': { lte: 'now' } } }, - ], - }, - }, - ], - }, - }, - // Either task has an interval or the attempts < the maximum configured - { - bool: { - should: [ - { exists: { field: 'task.interval' } }, - ...Object.entries(this.definitions).map(([type, definition]) => ({ - bool: { - must: [ - { term: { 'task.taskType': type } }, - { - range: { - 'task.attempts': { - lt: definition.maxAttempts || this.maxAttempts, - }, - }, - }, - ], - }, - })), - ], - }, - }, - ], - }, - }, - size: 10, - sort: { - _script: { - type: 'number', - order: 'asc', - script: { - lang: 'expression', - source: `doc['task.retryAt'].value || doc['task.runAt'].value`, - }, - }, - }, - seq_no_primary_term: true, - }); - - return docs; - } - /** * Claims available tasks from the index, which are ready to be run. * - runAt is now or past @@ -376,6 +297,7 @@ export class TaskStore { return docs; } + /** * Updates the specified doc in the index, returning the doc * with its version up to date. @@ -388,7 +310,10 @@ export class TaskStore { 'task', doc.id, taskInstanceToAttributes(doc), - { version: doc.version } + { + refresh: false, + version: doc.version, + } ); return savedObjectToConcreteTaskInstance(updatedSavedObject); diff --git a/x-pack/legacy/plugins/task_manager/test_utils/index.ts b/x-pack/legacy/plugins/task_manager/test_utils/index.ts index 8ee5cdce064cc..719ccadbe33dd 100644 --- a/x-pack/legacy/plugins/task_manager/test_utils/index.ts +++ b/x-pack/legacy/plugins/task_manager/test_utils/index.ts @@ -8,8 +8,6 @@ * A handful of helper functions for testing the task manager. */ -import sinon from 'sinon'; - // Caching this here to avoid setTimeout mocking affecting our tests. const nativeTimeout = setTimeout; @@ -18,10 +16,10 @@ const nativeTimeout = setTimeout; */ export function mockLogger() { return { - info: sinon.stub(), - debug: sinon.stub(), - warn: sinon.stub(), - error: sinon.stub(), + info: jest.fn(), + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), }; } diff --git a/x-pack/scripts/functional_tests_server.js b/x-pack/scripts/functional_tests_server.js index c361f1839c50b..039da38c8f547 100644 --- a/x-pack/scripts/functional_tests_server.js +++ b/x-pack/scripts/functional_tests_server.js @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +process.env.ALLOW_PERFORMANCE_HOOKS_IN_TASK_MANAGER = true; + require('@kbn/plugin-helpers').babelRegister(); require('@kbn/test').startServersCli( require.resolve('../test/functional/config.js'), diff --git a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/index.js b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/index.js index 0547069dfb469..17ae9b26fa3b3 100644 --- a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/index.js +++ b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/index.js @@ -4,16 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ +import uuid from 'uuid'; +import _ from 'lodash'; +import stats from 'stats-lite'; +import prettyMilliseconds from 'pretty-ms'; +import { performance, PerformanceObserver } from 'perf_hooks'; import { initRoutes } from './init_routes'; -function avg(items) { - return ( - items.reduce((sum, val) => { - return sum + val; - }, 0) / items.length - ); -} - export default function TaskManagerPerformanceAPI(kibana) { return new kibana.Plugin({ name: 'perfTask', @@ -27,35 +24,46 @@ export default function TaskManagerPerformanceAPI(kibana) { init(server) { const taskManager = server.plugins.task_manager; - const performanceState = { - runningAverageTasks: 0, - averagesTaken: [], - runningAverageLeadTime: -1, - averagesTakenLeadTime: [], - leadTimeQueue: [], - }; + const performanceState = resetPerfState({}); + + let lastFlush = new Date(); + function flushPerfStats() { + setTimeout(flushPerfStats, 5000); + const prevFlush = lastFlush; + lastFlush = new Date(); - setInterval(() => { const tasks = performanceState.leadTimeQueue.length; - console.log(`I have processed ${tasks} tasks in the past 5s`); + const title = `[Perf${performanceState.capturing ? ' (capturing)' : ''}]`; + const seconds = parseInt((lastFlush - prevFlush) / 1000); + console.log( + `${title} I have processed ${tasks} tasks in the past ${seconds}s (${tasks / + seconds} per second)` + ); if (tasks > 0) { - const latestAverage = avg(performanceState.leadTimeQueue.splice(0, tasks)); + const latestAverage = avg(performanceState.leadTimeQueue.splice(0, tasks)).mean; performanceState.averagesTakenLeadTime.push(latestAverage); performanceState.averagesTaken.push(tasks); if (performanceState.averagesTakenLeadTime.length > 1) { - performanceState.runningAverageLeadTime = avg(performanceState.averagesTakenLeadTime); - performanceState.runningAverageTasks = avg(performanceState.averagesTaken); + performanceState.runningAverageLeadTime = avg( + performanceState.averagesTakenLeadTime + ).mean; + performanceState.runningAverageTasksPerSecond = + avg(performanceState.averagesTaken).mean / 5; } else { performanceState.runningAverageLeadTime = latestAverage; - performanceState.runningAverageTasks = tasks; + performanceState.runningAverageTasksPerSecond = tasks / 5; } } - }, 5000); + } + + setTimeout(flushPerfStats, 5000); + + const title = 'Perf Test Task'; taskManager.registerTaskDefinitions({ performanceTestTask: { - title: 'Perf Test Task', + title, description: 'A task for stress testing task_manager.', timeout: '1m', @@ -64,27 +72,41 @@ export default function TaskManagerPerformanceAPI(kibana) { async run() { const { params, state } = taskInstance; - const runAt = millisecondsFromNow(5000); + const counter = state.counter ? state.counter : 1; + const now = Date.now(); const leadTime = now - taskInstance.runAt; performanceState.leadTimeQueue.push(leadTime); - const counter = (state.counter ? 1 + state.counter : 1); + // schedule to run next cycle as soon as possible + const runAt = calRunAt(params, counter); const stateUpdated = { ...state, - counter + counter: counter + 1, }; - if(params.trackExecutionTimeline) { - stateUpdated.timeline = stateUpdated.timeline || []; - stateUpdated.timeline.push({ - owner: taskInstance.owner.split('-')[0], - counter, - leadTime, - ranAt: now - }); + if (params.trackExecutionTimeline && state.perf && state.perf.id) { + performance.mark(`perfTask_run_${state.perf.id}_${counter}`); + performance.measure( + 'perfTask.markUntilRun', + `perfTask_markAsRunning_${state.perf.id}_${counter}`, + `perfTask_run_${state.perf.id}_${counter}` + ); + if (counter === 1) { + performance.measure( + 'perfTask.firstRun', + `perfTask_schedule_${state.perf.id}`, + `perfTask_run_${state.perf.id}_${counter}` + ); + performance.measure( + 'perfTask.firstMarkAsRunningTillRan', + `perfTask_markAsRunning_${state.perf.id}_${counter}`, + `perfTask_run_${state.perf.id}_${counter}` + ); + } } + return { state: stateUpdated, runAt, @@ -95,17 +117,249 @@ export default function TaskManagerPerformanceAPI(kibana) { }, }); - initRoutes(server, performanceState); + taskManager.addMiddleware({ + async beforeSave({ taskInstance, ...opts }) { + const modifiedInstance = { + ...taskInstance, + }; + + if (taskInstance.params && taskInstance.params.trackExecutionTimeline) { + modifiedInstance.state = modifiedInstance.state || {}; + modifiedInstance.state.perf = modifiedInstance.state.perf || {}; + modifiedInstance.state.perf.id = uuid.v4().replace(/-/gi, '_'); + performance.mark(`perfTask_schedule_${modifiedInstance.state.perf.id}`); + } + + return { + ...opts, + taskInstance: modifiedInstance, + }; + }, + + async beforeMarkRunning({ taskInstance, ...opts }) { + const modifiedInstance = { + ...taskInstance, + }; + + if ( + modifiedInstance.state && + modifiedInstance.state.perf && + modifiedInstance.state.perf.id + ) { + const { counter = 1 } = modifiedInstance.state; + performance.mark(`perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}`); + if (counter === 1) { + performance.measure( + 'perfTask.firstMarkAsRunning', + `perfTask_schedule_${modifiedInstance.state.perf.id}`, + `perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}` + ); + } else if (counter > 1) { + performance.measure( + 'perfTask.runUntilNextMarkAsRunning', + `perfTask_run_${modifiedInstance.state.perf.id}_${counter - 1}`, + `perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}` + ); + } + } + + return { + ...opts, + taskInstance: modifiedInstance, + }; + }, + }); + + const perfApi = { + capture() { + resetPerfState(performanceState); + performanceState.capturing = true; + performance.mark('perfTest.start'); + }, + endCapture() { + return new Promise(resolve => { + performanceState.performance.summarize.push([resolve, perfApi.summarize]); + + performance.mark('perfTest.end'); + performance.measure('perfTest.duration', 'perfTest.start', 'perfTest.end'); + }); + }, + summarize(perfTestDuration) { + const { + runningAverageTasksPerSecond, + runningAverageLeadTime, + performance, + } = performanceState; + + const { + numberOfTasksRanOverall, + elasticsearchApiCalls, + activityDuration, + sleepDuration, + cycles, + claimAvailableTasksNoTasks, + claimAvailableTasksNoAvailableWorkers, + taskPoolAttemptToRun, + taskRunnerMarkTaskAsRunning + } = performance; + + const perfRes = { + perfTestDuration: prettyMilliseconds(perfTestDuration), + runningAverageTasksPerSecond, + runningAverageLeadTime, + numberOfTasksRanOverall, + claimAvailableTasksNoTasks, + claimAvailableTasksNoAvailableWorkers, + elasticsearchApiCalls: _.mapValues(elasticsearchApiCalls, avg), + sleepDuration: prettyMilliseconds(stats.sum(sleepDuration)), + activityDuration: prettyMilliseconds(stats.sum(activityDuration)), + cycles, + taskPoolAttemptToRun: avg(taskPoolAttemptToRun), + taskRunnerMarkTaskAsRunning: avg(taskRunnerMarkTaskAsRunning), + }; + + resetPerfState(performanceState); + + return perfRes; + }, + }; + + initRoutes(server, perfApi); }, }); } -function millisecondsFromNow(ms) { - if (!ms) { - return; +function calRunAt(params, counter) { + const runAt = counter === 1 ? new Date(params.startAt) : new Date(); + return runAt.getTime() < params.runUntil ? runAt : undefined; +} + +function avg(items) { + const mode = stats.mode(items); + return { + mean: parseInt(stats.mean(items)), + range: { + min: parseInt(typeof mode === 'number' ? mode : _.min([...mode])), + max: parseInt(typeof mode === 'number' ? mode : _.max([...mode])), + }, + }; +} + +function resetPerfState(target) { + if(target.performanceObserver) { + target.performanceObserver.disconnect(); } - const dt = new Date(); - dt.setTime(dt.getTime() + ms); - return dt; + const performanceState = Object.assign(target, { + capturing: false, + runningAverageTasksPerSecond: 0, + averagesTaken: [], + runningAverageLeadTime: -1, + averagesTakenLeadTime: [], + leadTimeQueue: [], + performance: { + numberOfTasksRanOverall: 0, + cycles: { + fillPoolStarts: 0, + fillPoolCycles: 0, + fillPoolBail: 0, + fillPoolBailNoTasks: 0, + }, + claimAvailableTasksNoTasks: 0, + claimAvailableTasksNoAvailableWorkers: 0, + elasticsearchApiCalls: { + timeUntilFirstRun: [], + timeUntilFirstMarkAsRun: [], + firstMarkAsRunningTillRan: [], + timeFromMarkAsRunTillRun: [], + timeFromRunTillNextMarkAsRun: [], + claimAvailableTasks: [], + }, + activityDuration: [], + sleepDuration: [], + taskPollerActivityDurationPreScheduleComplete: [], + taskPoolAttemptToRun: [], + taskRunnerMarkTaskAsRunning: [], + + summarize: [] + }, + }); + + performanceState.performanceObserver = new PerformanceObserver((list, observer) => { + list.getEntries().forEach(entry => { + const { name, duration } = entry; + switch (name) { + // Elasticsearch Api Calls + case 'perfTask.firstRun': + performanceState.performance.elasticsearchApiCalls.timeUntilFirstRun.push(duration); + break; + case 'perfTask.firstMarkAsRunning': + performanceState.performance.elasticsearchApiCalls.timeUntilFirstMarkAsRun.push(duration); + break; + case 'perfTask.firstMarkAsRunningTillRan': + performanceState.performance.elasticsearchApiCalls.firstMarkAsRunningTillRan.push(duration); + break; + case 'perfTask.markUntilRun': + performanceState.performance.elasticsearchApiCalls.timeFromMarkAsRunTillRun.push(duration); + break; + case 'perfTask.runUntilNextMarkAsRunning': + performanceState.performance.elasticsearchApiCalls.timeFromRunTillNextMarkAsRun.push(duration); + break; + case 'claimAvailableTasks': + performanceState.performance.elasticsearchApiCalls.claimAvailableTasks.push(duration); + break; + case 'TaskPoller.sleepDuration': + performanceState.performance.sleepDuration.push(duration); + break; + case 'fillPool.activityDurationUntilNoTasks': + performanceState.performance.activityDuration.push(duration); + break; + case 'fillPool.activityDurationUntilExhaustedCapacity': + performanceState.performance.activityDuration.push(duration); + break; + case 'fillPool.bailExhaustedCapacity': + performanceState.performance.cycles.fillPoolBail++; + break; + case 'fillPool.bailNoTasks': + performanceState.performance.cycles.fillPoolBail++; + performanceState.performance.cycles.fillPoolBailNoTasks++; + break; + case 'fillPool.start': + performanceState.performance.cycles.fillPoolStarts++; + break; + case 'fillPool.cycle': + performanceState.performance.cycles.fillPoolCycles++; + break; + break; + case 'claimAvailableTasks.noTasks': + performanceState.performance.claimAvailableTasksNoTasks++; + break; + case 'claimAvailableTasks.noAvailableWorkers': + performanceState.performance.claimAvailableTasksNoAvailableWorkers++; + break; + case 'taskPool.attemptToRun': + performanceState.performance.taskPoolAttemptToRun.push(duration); + break; + case 'taskRunner.markTaskAsRunning': + performanceState.performance.taskRunnerMarkTaskAsRunning.push(duration); + break; + case 'perfTest.duration': + observer.disconnect(); + const { summarize } = performanceState.performance; + if(summarize && summarize.length) { + summarize.splice(0, summarize.length).forEach(([resolve, summarize]) => { + resolve(summarize(duration)); + }); + } + break; + default: + if (name.startsWith('perfTask_run_')) { + performanceState.performance.numberOfTasksRanOverall++; + } + } + }); + }); + performanceState.performanceObserver.observe({ entryTypes: ['measure', 'mark'] }); + + return performanceState; } diff --git a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/init_routes.js b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/init_routes.js index b9fe788093d11..ca6d8707f5c58 100644 --- a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/init_routes.js +++ b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/init_routes.js @@ -5,6 +5,7 @@ */ import Joi from 'joi'; +import { range, chunk } from 'lodash'; const scope = 'perf-testing'; export function initRoutes(server, performanceState) { @@ -18,32 +19,56 @@ export function initRoutes(server, performanceState) { payload: Joi.object({ tasksToSpawn: Joi.number().required(), durationInSeconds: Joi.number().required(), - trackExecutionTimeline: Joi.boolean().default(false).required(), + trackExecutionTimeline: Joi.boolean() + .default(false) + .required(), }), }, }, async handler(request) { - const { tasksToSpawn, durationInSeconds, trackExecutionTimeline } = request.payload; - const tasks = []; + performanceState.capture(); - for (let taskIndex = 0; taskIndex < tasksToSpawn; taskIndex++) { - tasks.push( - await taskManager.schedule( - { - taskType: 'performanceTestTask', - params: { taskIndex, trackExecutionTimeline }, - scope: [scope], - }, - { request } + const { tasksToSpawn, durationInSeconds, trackExecutionTimeline } = request.payload; + const startAt = millisecondsFromNow(5000).getTime(); + await chunk(range(tasksToSpawn), 200) + .map(chunkOfTasksToSpawn => () => + Promise.all( + chunkOfTasksToSpawn.map(taskIndex => + taskManager.schedule( + { + taskType: 'performanceTestTask', + params: { + startAt, + taskIndex, + trackExecutionTimeline, + runUntil: millisecondsFromNow(durationInSeconds * 1000).getTime(), + }, + scope: [scope], + }, + { request } + ) + ) ) - ); - } + ) + .reduce((chain, nextExecutor) => { + return chain.then(() => nextExecutor()); + }, Promise.resolve()); return new Promise(resolve => { setTimeout(() => { - resolve(performanceState); - }, durationInSeconds * 1000); + performanceState.endCapture().then(resolve); + }, durationInSeconds * 1000 + 10000 /* wait extra 10s to drain queue */); }); }, }); } + +function millisecondsFromNow(ms) { + if (!ms) { + return; + } + + const dt = new Date(); + dt.setTime(dt.getTime() + ms); + return dt; +} diff --git a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/package.json b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/package.json index 22ee21852aeee..7d46d6b0f3cca 100644 --- a/x-pack/test/plugin_api_perf/plugins/task_manager_performance/package.json +++ b/x-pack/test/plugin_api_perf/plugins/task_manager_performance/package.json @@ -7,6 +7,10 @@ }, "license": "Apache-2.0", "dependencies": { - "joi": "^13.5.2" + "lodash": "^4.17.15", + "uuid": "3.3.2", + "joi": "^13.5.2", + "stats-lite": "2.2.0", + "pretty-ms": "5.0.0" } } diff --git a/x-pack/test/plugin_api_perf/test_suites/task_manager/task_manager_perf_integration.ts b/x-pack/test/plugin_api_perf/test_suites/task_manager/task_manager_perf_integration.ts index e8dbf7c509287..d5caff5f9d10b 100644 --- a/x-pack/test/plugin_api_perf/test_suites/task_manager/task_manager_perf_integration.ts +++ b/x-pack/test/plugin_api_perf/test_suites/task_manager/task_manager_perf_integration.ts @@ -10,23 +10,151 @@ export default function({ getService }: { getService: (service: string) => any } const log = getService('log'); const supertest = getService('supertest'); + const params = { tasksToSpawn: 100, trackExecutionTimeline: true, durationInSeconds: 60 }; describe('stressing task manager', () => { - it('should run 10 tasks every second for a minute', async () => { - const { runningAverageTasks, runningAverageLeadTime } = await supertest + it(`should run ${params.tasksToSpawn} tasks over ${params.durationInSeconds} seconds`, async () => { + const { + runningAverageTasksPerSecond, + runningAverageLeadTime, + // how often things happen in Task Manager + cycles: { fillPoolStarts, fillPoolCycles, fillPoolBail, fillPoolBailNoTasks }, + claimAvailableTasksNoTasks, + claimAvailableTasksNoAvailableWorkers, + numberOfTasksRanOverall, + // how long it takes to talk to Elasticsearch + elasticsearchApiCalls: { + timeUntilFirstMarkAsRun, + firstMarkAsRunningTillRan, + timeFromMarkAsRunTillRun, + timeFromRunTillNextMarkAsRun, + claimAvailableTasks, + }, + // durations in Task Manager + perfTestDuration, + taskPoolAttemptToRun, + taskRunnerMarkTaskAsRunning, + sleepDuration, + activityDuration, + } = await supertest .post('/api/perf_tasks') .set('kbn-xsrf', 'xxx') - .send({ tasksToSpawn: 20, trackExecutionTimeline: true, durationInSeconds: 60 }) + .send(params) .expect(200) .then((response: any) => response.body); - log.debug(`Stress Test Result:`); - log.debug(`Average number of tasks executed per second: ${runningAverageTasks}`); - log.debug( - `Average time it took from the moment a task's scheduled time was reached, until Task Manager picked it up: ${runningAverageLeadTime}` + log.info(cyan(`Stress Test Result:`)); + log.info( + `Average number of tasks executed per second: ${bright(runningAverageTasksPerSecond)}` ); + log.info( + `Average time between a task's "runAt" scheduled time and the time it actually ran: ${bright( + runningAverageLeadTime + )}` + ); + + if (params.trackExecutionTimeline) { + log.info( + `Overall number of tasks ran in ${bright(params.durationInSeconds)} seconds: ${bright( + numberOfTasksRanOverall + )}` + ); + log.info(`Average time between stages:`); + log.info( + `Schedule ---[${descMetric( + timeUntilFirstMarkAsRun + )}]--> first markAsRunning ---[${descMetric(firstMarkAsRunningTillRan)}]--> first run` + ); + log.info( + `markAsRunning ---[${descMetric(timeFromMarkAsRunTillRun)}]--> run ---[${descMetric( + timeFromRunTillNextMarkAsRun + )}]---> next markAsRunning` + ); + log.info(`Duration of Perf Test: ${bright(perfTestDuration)}`); + log.info(`Activity within Task Poller: ${bright(activityDuration)}`); + log.info(`Inactivity due to Sleep: ${bright(sleepDuration)}`); + log.info(`Polling Cycles: ${colorizeCycles(fillPoolStarts, fillPoolCycles, fillPoolBail)}`); + if (fillPoolBail > 0) { + log.info(` ⮑ Bailed due to:`); + if (fillPoolBailNoTasks > 0) { + log.info(` ⮑ No Tasks To Process:`); + if (claimAvailableTasksNoTasks > 0) { + log.info(` ⮑ ${claimAvailableTasksNoTasks} Times, due to No Tasks Claimed`); + } + if (claimAvailableTasksNoAvailableWorkers > 0) { + log.info( + ` ⮑ ${claimAvailableTasksNoAvailableWorkers} Times, due to having No Available Worker Capacity` + ); + } + } + if (fillPoolBail - fillPoolBailNoTasks > 0) { + log.info( + ` ⮑ Exhausted Available Workers due to on going Task runs ${fillPoolBail - + fillPoolBailNoTasks}` + ); + } + } + log.info( + `average duration taken to Claim Available Tasks: ${descMetric(claimAvailableTasks)}` + ); + log.info( + `average duration taken to Mark claimed Tasks as Running in Task Pool: ${descMetric( + taskPoolAttemptToRun + )}` + ); + log.info( + `average duration taken to Mark individual Tasks as Running in Task Runner: ${descMetric( + taskRunnerMarkTaskAsRunning + )}` + ); + } - expect(runningAverageTasks).to.be.greaterThan(0); + expect(runningAverageTasksPerSecond).to.be.greaterThan(0); expect(runningAverageLeadTime).to.be.greaterThan(0); }); }); } + +function descMetric(metric: { mean: number; range: { min: number; max: number } }): string { + return `${colorize(metric.mean)}ms ${dim(`(`)}${colorize(metric.range.min)}${dim( + `ms - ` + )}${colorize(metric.range.max)}${dim(`ms)`)}`; +} + +function colorize(avg: number) { + if (!avg) { + return red('?'); + } + return avg < 500 ? green(`${avg}`) : avg < 1000 ? cyan(`${avg}`) : red(`${avg}`); +} + +function colorizeCycles(fillPoolStarts: number, fillPoolCycles: number, fillPoolBail: number) { + const perc = (fillPoolCycles * 100) / fillPoolStarts; + const colorFunc = perc >= 100 ? green : perc >= 50 ? cyan : red; + return ( + `ran ` + + bright(`${fillPoolStarts}`) + + ` cycles, of which ` + + colorFunc(`${fillPoolCycles}`) + + ` were reran before bailing` + ); +} + +function cyan(str: string) { + return `\x1b[36m${str}\x1b[0m`; +} + +function red(str: string) { + return `\x1b[31m${str}\x1b[0m`; +} + +function green(str: string) { + return `\x1b[32m${str}\x1b[0m`; +} + +function dim(str: string) { + return `\x1b[2m${str}\x1b[0m`; +} + +function bright(str: string | number) { + return `\x1b[1m${str}\x1b[0m`; +} diff --git a/yarn.lock b/yarn.lock index 185152e0d0a34..41b7b5eac1110 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5982,6 +5982,14 @@ babel-plugin-emotion@^9.2.11: source-map "^0.5.7" touch "^2.0.1" +babel-plugin-filter-imports@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/babel-plugin-filter-imports/-/babel-plugin-filter-imports-3.0.0.tgz#a849683837ad29960da17492fb32789ab6b09a11" + integrity sha512-p/chjzVTgCxUqyLM0q/pfWVZS7IJTwGQMwNg0LOvuQpKiTftQgZDtkGB8XvETnUw19rRcL7bJCTopSwibTN2tA== + dependencies: + "@babel/types" "^7.4.0" + lodash "^4.17.11" + babel-plugin-istanbul@^5.1.0: version "5.1.1" resolved "https://registry.yarnpkg.com/babel-plugin-istanbul/-/babel-plugin-istanbul-5.1.1.tgz#7981590f1956d75d67630ba46f0c22493588c893"