Skip to content

Commit

Permalink
Improves performance of task execution in Task manager
Browse files Browse the repository at this point in the history
This PR include three key changes:

1. Run tasks as soon as they have been marked as running, rather than wait for the whole batch to me marked
2. Use a custom refresh setting of refresh: false where possible, in place of wait_for, in order to speed up Task Manager's internal workflow
3. Instrumentation of Task Manager exposing Activity / Inactivity metrics in Performance test runs
  • Loading branch information
gmmorris authored Nov 6, 2019
1 parent 17383b8 commit c485893
Show file tree
Hide file tree
Showing 25 changed files with 894 additions and 478 deletions.
30 changes: 25 additions & 5 deletions packages/kbn-babel-preset/node_preset.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@
*/

module.exports = (_, options = {}) => {
const overrides = [];
if (!process.env.ALLOW_PERFORMANCE_HOOKS_IN_TASK_MANAGER) {
overrides.push(
{
test: [/x-pack[\/\\]legacy[\/\\]plugins[\/\\]task_manager/],
plugins: [
[
'filter-imports',
{
imports: {
perf_hooks: ['performance'],
},
},
],
],
}
);
}

return {
presets: [
[
Expand All @@ -39,7 +58,7 @@ module.exports = (_, options = {}) => {
modules: 'cjs',
corejs: 3,

...(options['@babel/preset-env'] || {})
...(options['@babel/preset-env'] || {}),
},
],
require('./common_preset'),
Expand All @@ -48,9 +67,10 @@ module.exports = (_, options = {}) => {
[
require.resolve('babel-plugin-transform-define'),
{
'global.__BUILT_WITH_BABEL__': 'true'
}
]
]
'global.__BUILT_WITH_BABEL__': 'true',
},
],
],
overrides,
};
};
3 changes: 2 additions & 1 deletion packages/kbn-babel-preset/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
"@babel/plugin-syntax-dynamic-import": "^7.2.0",
"@babel/plugin-transform-modules-commonjs": "^7.5.0",
"@babel/preset-env": "^7.5.5",
"@babel/preset-react":"^7.0.0",
"@babel/preset-react": "^7.0.0",
"@babel/preset-typescript": "^7.3.3",
"@kbn/elastic-idx": "1.0.0",
"babel-plugin-add-module-exports": "^1.0.2",
"babel-plugin-filter-imports": "^3.0.0",
"babel-plugin-transform-define": "^1.3.1",
"babel-plugin-typescript-strip-namespaces": "^1.1.1"
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/.i18nrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"xpack.server": "legacy/server",
"xpack.snapshotRestore": "legacy/plugins/snapshot_restore",
"xpack.spaces": ["legacy/plugins/spaces", "plugins/spaces"],
"xpack.taskManager": "legacy/plugins/task_manager",
"xpack.transform": "legacy/plugins/transform",
"xpack.upgradeAssistant": "legacy/plugins/upgrade_assistant",
"xpack.uptime": "legacy/plugins/uptime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
// Which is basically the Haskel equivalent of Rust/ML/Scala's Result
// I'll reach out to other's in Kibana to see if we can merge these into one type

// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type Ok<T> = {
export interface Ok<T> {
tag: 'ok';
value: T;
};
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type Err<E> = {
}

export interface Err<E> {
tag: 'err';
error: E;
};
}
export type Result<T, E> = Ok<T> | Err<E>;

export function asOk<T>(value: T): Ok<T> {
Expand Down
11 changes: 6 additions & 5 deletions x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
import _ from 'lodash';
import sinon from 'sinon';
import { fillPool } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';

describe('fillPool', () => {
test('stops filling when there are no more tasks in the store', async () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => true);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;

await fillPool(run, fetchAvailableTasks, converter);
Expand All @@ -25,7 +26,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;

await fillPool(run, fetchAvailableTasks, converter);
Expand All @@ -37,7 +38,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();

await fillPool(run, fetchAvailableTasks, converter);
Expand All @@ -47,7 +48,7 @@ describe('fillPool', () => {

describe('error handling', () => {
test('throws exception from fetchAvailableTasks', async () => {
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();

try {
Expand Down Expand Up @@ -80,7 +81,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => {
throw new Error(`can not convert ${x}`);
};
Expand Down
32 changes: 27 additions & 5 deletions x-pack/legacy/plugins/task_manager/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
* you may not use this file except in compliance with the Elastic License.
*/

type BatchRun<T> = (tasks: T[]) => Promise<boolean>;
import { performance } from 'perf_hooks';
import { TaskPoolRunResult } from '../task_pool';

export enum FillPoolResult {
NoTasksClaimed = 'NoTasksClaimed',
RanOutOfCapacity = 'RanOutOfCapacity',
}

type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
type Fetcher<T> = () => Promise<T[]>;
type Converter<T1, T2> = (t: T1) => T2;

Expand All @@ -24,18 +32,32 @@ export async function fillPool<TRecord, TRunner>(
run: BatchRun<TRunner>,
fetchAvailableTasks: Fetcher<TRecord>,
converter: Converter<TRecord, TRunner>
): Promise<void> {
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
while (true) {
const instances = await fetchAvailableTasks();

if (!instances.length) {
return;
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}

const tasks = instances.map(converter);

if (!(await run(tasks))) {
return;
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
}
}
6 changes: 6 additions & 0 deletions x-pack/legacy/plugins/task_manager/lib/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ describe('addMiddlewareToChain', () => {
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};
const m2 = {
beforeSave: async (opts: BeforeSaveOpts) => {
Object.assign(opts.taskInstance.params, { m2: true });
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};
const m3 = {
beforeSave: async (opts: BeforeSaveOpts) => {
Object.assign(opts.taskInstance.params, { m3: true });
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};

let middlewareChain;
Expand Down Expand Up @@ -119,6 +122,7 @@ describe('addMiddlewareToChain', () => {
m1: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};
const m2 = {
beforeSave: defaultBeforeSave,
Expand All @@ -128,6 +132,7 @@ describe('addMiddlewareToChain', () => {
m2: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};
const m3 = {
beforeSave: defaultBeforeSave,
Expand All @@ -137,6 +142,7 @@ describe('addMiddlewareToChain', () => {
m3: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};

let middlewareChain;
Expand Down
8 changes: 8 additions & 0 deletions x-pack/legacy/plugins/task_manager/lib/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ export type BeforeSaveFunction = (
) => Promise<BeforeSaveMiddlewareParams>;

export type BeforeRunFunction = (params: RunContext) => Promise<RunContext>;
export type BeforeMarkRunningFunction = (params: RunContext) => Promise<RunContext>;

export interface Middleware {
beforeSave: BeforeSaveFunction;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
}

export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) {
Expand All @@ -39,8 +41,14 @@ export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Mid
? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun)
: prevMiddleware.beforeRun;

const beforeMarkRunning = middleware.beforeMarkRunning
? (params: RunContext) =>
middleware.beforeMarkRunning(params).then(prevMiddleware.beforeMarkRunning)
: prevMiddleware.beforeMarkRunning;

return {
beforeSave,
beforeRun,
beforeMarkRunning,
};
}
7 changes: 6 additions & 1 deletion x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ describe('TaskManager', () => {
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
beforeMarkRunning: async (runOpts: any) => runOpts,
};
expect(() => client.addMiddleware(middleware)).not.toThrow();
});
Expand All @@ -183,6 +184,7 @@ describe('TaskManager', () => {
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
beforeMarkRunning: async (runOpts: any) => runOpts,
};

client.start();
Expand Down Expand Up @@ -241,7 +243,10 @@ describe('TaskManager', () => {

claimAvailableTasks(claim, 10, logger);

sinon.assert.calledWithMatch(logger.warn, /inline scripts/);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
`"Task Manager cannot operate when inline scripts are disabled in Elasticsearch"`
);
});
});
});
29 changes: 21 additions & 8 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { performance } from 'perf_hooks';
import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server';
import { Logger } from './types';
import { fillPool } from './lib/fill_pool';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
import { intervalFromNow } from './lib/intervals';
Expand Down Expand Up @@ -56,13 +56,14 @@ export class TaskManager {
private readonly pollerInterval: number;
private definitions: TaskDictionary<TaskDefinition>;
private store: TaskStore;
private poller: TaskPoller;
private poller: TaskPoller<FillPoolResult>;
private logger: Logger;
private pool: TaskPool;
private startQueue: Array<() => void> = [];
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
beforeMarkRunning: async (runOpts: RunContext) => runOpts,
};

/**
Expand All @@ -86,8 +87,6 @@ export class TaskManager {
this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`);
}

/* Kibana UUID needs to be pulled live (not cached), as it takes a long time
* to initialize, and can change after startup */
const store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
Expand All @@ -109,13 +108,14 @@ export class TaskManager {
store,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
});
const poller = new TaskPoller({
const poller = new TaskPoller<FillPoolResult>({
logger: this.logger,
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work: (): Promise<void> =>
work: (): Promise<FillPoolResult> =>
fillPool(
pool.run,
async tasks => await pool.run(tasks),
() =>
claimAvailableTasks(
this.store.claimAvailableTasks.bind(this.store),
Expand Down Expand Up @@ -260,12 +260,24 @@ export async function claimAvailableTasks(
logger: Logger
) {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');

try {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});

if (claimedTasks === 0) {
performance.mark('claimAvailableTasks.noTasks');
}
performance.mark('claimAvailableTasks_stop');
performance.measure(
'claimAvailableTasks',
'claimAvailableTasks_start',
'claimAvailableTasks_stop'
);

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
Expand All @@ -282,6 +294,7 @@ export async function claimAvailableTasks(
}
}
} else {
performance.mark('claimAvailableTasks.noAvailableWorkers');
logger.info(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers. If this happens often, consider adjusting the "xpack.task_manager.max_workers" configuration.`
);
Expand Down
Loading

0 comments on commit c485893

Please sign in to comment.