From a342260d5d094a8c7384d599d6121d08cdeab1a6 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Tue, 21 Jul 2020 14:08:29 +0100 Subject: [PATCH] [Task Manager] Batches the update operations in Task Manager (#71470) This PR attempts to batch update tasks in Task Manager in order to avoid overloading the Elasticsearch queue. This is the 1st PR addressing https://github.com/elastic/kibana/issues/65551 Under the hood we now use a Reactive buffer accumulates all calls to the `update` api in the TaskStore and flushes after 50ms or when as many operations as there are workers have been buffered (whichever comes first). --- .../server/buffered_task_store.test.ts | 82 +++++ .../server/buffered_task_store.ts | 39 +++ .../server/lib/bulk_operation_buffer.test.ts | 288 ++++++++++++++++++ .../server/lib/bulk_operation_buffer.ts | 129 ++++++++ .../server/lib/result_type.test.ts | 27 ++ .../task_manager/server/lib/result_type.ts | 19 ++ .../task_manager/server/task_manager.ts | 10 +- .../task_manager/server/task_runner.ts | 2 +- .../task_manager/server/task_store.mock.ts | 31 ++ .../plugins/task_manager/server/task_store.ts | 65 +++- 10 files changed, 685 insertions(+), 7 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.test.ts create mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.ts create mode 100644 x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts create mode 100644 x-pack/plugins/task_manager/server/lib/result_type.test.ts create mode 100644 x-pack/plugins/task_manager/server/task_store.mock.ts diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts new file mode 100644 index 0000000000000..8e18405c79ed2 --- /dev/null +++ b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid'; +import { taskStoreMock } from './task_store.mock'; +import { BufferedTaskStore } from './buffered_task_store'; +import { asErr, asOk } from './lib/result_type'; +import { TaskStatus } from './task'; + +describe('Buffered Task Store', () => { + test('proxies the TaskStore for `maxAttempts` and `remove`', async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + taskStore.bulkUpdate.mockResolvedValue([]); + const bufferedStore = new BufferedTaskStore(taskStore, {}); + + expect(bufferedStore.maxAttempts).toEqual(10); + + bufferedStore.remove('1'); + expect(taskStore.remove).toHaveBeenCalledWith('1'); + }); + + describe('update', () => { + test("proxies the TaskStore's `bulkUpdate`", async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + const bufferedStore = new BufferedTaskStore(taskStore, {}); + + const task = mockTask(); + + taskStore.bulkUpdate.mockResolvedValue([asOk(task)]); + + expect(await bufferedStore.update(task)).toMatchObject(task); + expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]); + }); + + test('handles partially successfull bulkUpdates resolving each call appropriately', async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + const bufferedStore = new BufferedTaskStore(taskStore, {}); + + const tasks = [mockTask(), mockTask(), mockTask()]; + + taskStore.bulkUpdate.mockResolvedValueOnce([ + asOk(tasks[0]), + asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }), + asOk(tasks[2]), + ]); + + const results = [ + bufferedStore.update(tasks[0]), + bufferedStore.update(tasks[1]), + bufferedStore.update(tasks[2]), + ]; + expect(await results[0]).toMatchObject(tasks[0]); + expect(results[1]).rejects.toMatchInlineSnapshot( + `[Error: Oh no, something went terribly wrong]` + ); + expect(await results[2]).toMatchObject(tasks[2]); + }); + }); +}); + +function mockTask() { + return { + id: `task_${uuid.v4()}`, + attempts: 0, + schedule: undefined, + params: { hello: 'world' }, + retryAt: null, + runAt: new Date(), + scheduledAt: new Date(), + scope: undefined, + startedAt: null, + state: { foo: 'bar' }, + status: TaskStatus.Idle, + taskType: 'report', + user: undefined, + version: '123', + ownerId: '123', + }; +} diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts new file mode 100644 index 0000000000000..e1e5f802204c1 --- /dev/null +++ b/x-pack/plugins/task_manager/server/buffered_task_store.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskStore } from './task_store'; +import { ConcreteTaskInstance } from './task'; +import { Updatable } from './task_runner'; +import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer'; +import { unwrapPromise } from './lib/result_type'; + +// by default allow updates to be buffered for up to 50ms +const DEFAULT_BUFFER_MAX_DURATION = 50; + +export class BufferedTaskStore implements Updatable { + private bufferedUpdate: Operation; + constructor(private readonly taskStore: TaskStore, options: BufferOptions) { + this.bufferedUpdate = createBuffer( + (docs) => taskStore.bulkUpdate(docs), + { + bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION, + ...options, + } + ); + } + + public get maxAttempts(): number { + return this.taskStore.maxAttempts; + } + + public async update(doc: ConcreteTaskInstance): Promise { + return unwrapPromise(this.bufferedUpdate(doc)); + } + + public async remove(id: string): Promise { + return this.taskStore.remove(id); + } +} diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts new file mode 100644 index 0000000000000..9293656233026 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts @@ -0,0 +1,288 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer'; +import { mapErr, asOk, asErr, Ok, Err } from './result_type'; + +interface TaskInstance extends Entity { + attempts: number; +} + +const createTask = (function (): () => TaskInstance { + let counter = 0; + return () => ({ + id: `task ${++counter}`, + attempts: 1, + }); +})(); + +function incrementAttempts(task: TaskInstance): Ok { + return asOk({ + ...task, + attempts: task.attempts + 1, + }); +} + +function errorAttempts(task: TaskInstance): Err> { + return asErr({ + entity: incrementAttempts(task).value, + error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 }, + }); +} + +describe('Bulk Operation Buffer', () => { + describe('createBuffer()', () => { + test('batches up multiple Operation calls', async () => { + const bulkUpdate: jest.Mocked> = jest.fn( + ([task1, task2]) => { + return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]); + } + ); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + + expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([ + incrementAttempts(task1), + incrementAttempts(task2), + ]); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + }); + + test('batch updates are executed at most by the next Event Loop tick by default', async () => { + const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + }); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + const task4 = createTask(); + const task5 = createTask(); + const task6 = createTask(); + + return new Promise((resolve) => { + Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]); + }); + + setTimeout(() => { + // on next tick + setTimeout(() => { + // on next tick + expect(bulkUpdate).toHaveBeenCalledTimes(2); + Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(3); + expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]); + resolve(); + }); + }, 0); + + expect(bulkUpdate).toHaveBeenCalledTimes(1); + Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); + }); + }, 0); + }); + }); + + test('batch updates can be customised to execute after a certain period', async () => { + const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + }); + + const bufferMaxDuration = 50; + const bufferedUpdate = createBuffer(bulkUpdate, { bufferMaxDuration }); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + const task4 = createTask(); + const task5 = createTask(); + const task6 = createTask(); + + return new Promise((resolve) => { + Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]); + }); + + setTimeout(() => { + // on next tick + setTimeout(() => { + // on next tick + expect(bulkUpdate).toHaveBeenCalledTimes(2); + Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(3); + expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]); + resolve(); + }); + }, bufferMaxDuration + 1); + + expect(bulkUpdate).toHaveBeenCalledTimes(1); + Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); + }); + }, bufferMaxDuration + 1); + }); + }); + + test('batch updates are executed once queue hits a certain bound', async () => { + const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + }); + + const bufferedUpdate = createBuffer(bulkUpdate, { + bufferMaxDuration: 100, + bufferMaxOperations: 2, + }); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + const task4 = createTask(); + const task5 = createTask(); + + return new Promise((resolve) => { + bufferedUpdate(task1); + bufferedUpdate(task2); + bufferedUpdate(task3); + bufferedUpdate(task4); + + setTimeout(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); + + setTimeout(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + bufferedUpdate(task5).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(3); + expect(bulkUpdate).toHaveBeenCalledWith([task5]); + resolve(); + }); + }, 50); + }, 50); + }); + }); + + test('queue upper bound is reset after each flush', async () => { + const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + }); + + const bufferMaxDuration = 100; + const bufferedUpdate = createBuffer(bulkUpdate, { + bufferMaxDuration, + bufferMaxOperations: 3, + }); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + const task4 = createTask(); + + return new Promise((resolve) => { + bufferedUpdate(task1); + bufferedUpdate(task2); + + setTimeout(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + + bufferedUpdate(task3); + bufferedUpdate(task4); + + setTimeout(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + + setTimeout(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); + resolve(); + }, bufferMaxDuration / 2); + }, bufferMaxDuration / 2); + }, bufferMaxDuration + 1); + }); + }); + test('handles both resolutions and rejections at individual task level', async (done) => { + const bulkUpdate: jest.Mocked> = jest.fn( + ([task1, task2, task3]) => { + return Promise.resolve([ + incrementAttempts(task1), + errorAttempts(task2), + incrementAttempts(task3), + ]); + } + ); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + + return Promise.all([ + expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)), + expect(bufferedUpdate(task2)).rejects.toMatchObject( + mapErr( + (err: OperationError) => asErr(err.error), + errorAttempts(task2) + ) + ), + expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)), + ]).then(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + done(); + }); + }); + + test('handles bulkUpdate failure', async (done) => { + const bulkUpdate: jest.Mocked> = jest.fn(() => { + return Promise.reject(new Error('bulkUpdate is an illusion')); + }); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + + return Promise.all([ + expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + ]).then(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + done(); + }); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts new file mode 100644 index 0000000000000..fca7ce02e0cd7 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { keyBy, map } from 'lodash'; +import { Subject, race, from } from 'rxjs'; +import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators'; +import { either, Result, asOk, asErr, Ok, Err } from './result_type'; + +export interface BufferOptions { + bufferMaxDuration?: number; + bufferMaxOperations?: number; +} + +export interface Entity { + id: string; +} + +export interface OperationError { + entity: Input; + error: ErrorOutput; +} + +export type OperationResult = Result< + Output, + OperationError +>; + +export type Operation = ( + entity: Input +) => Promise>; + +export type BulkOperation = ( + entities: Input[] +) => Promise>>; + +const DONT_FLUSH = false; +const FLUSH = true; + +export function createBuffer( + bulkOperation: BulkOperation, + { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {} +): Operation { + const flushBuffer = new Subject(); + + const storeUpdateBuffer = new Subject<{ + entity: Input; + onSuccess: (entity: Ok) => void; + onFailure: (error: Err) => void; + }>(); + + storeUpdateBuffer + .pipe( + bufferWhen(() => flushBuffer), + filter((tasks) => tasks.length > 0) + ) + .subscribe((entities) => { + const entityById = keyBy(entities, ({ entity: { id } }) => id); + bulkOperation(map(entities, 'entity')) + .then((results) => { + results.forEach((result) => + either( + result, + (entity) => { + entityById[entity.id].onSuccess(asOk(entity)); + }, + ({ entity, error }: OperationError) => { + entityById[entity.id].onFailure(asErr(error)); + } + ) + ); + }) + .catch((ex) => { + entities.forEach(({ onFailure }) => onFailure(asErr(ex))); + }); + }); + + let countInBuffer = 0; + const flushAndResetCounter = () => { + countInBuffer = 0; + flushBuffer.next(); + }; + storeUpdateBuffer + .pipe( + // complete once the buffer has either filled to `bufferMaxOperations` or + // a `bufferMaxDuration` has passed. Default to `bufferMaxDuration` being the + // current event loop tick rather than a fixed duration + flatMap(() => { + return ++countInBuffer === 1 + ? race([ + // the race is started in response to the first operation into the buffer + // so we flush once the remaining operations come in (which is `bufferMaxOperations - 1`) + storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations - 1)), + bufferMaxDuration + ? // if theres a max duration, flush buffer based on that + from(resolveIn(bufferMaxDuration)) + : // ensure we flush by the end of the "current" event loop tick + from(resolveImmediate()), + ]).pipe(first(), mapTo(FLUSH)) + : from([DONT_FLUSH]); + }), + filter((shouldFlush) => shouldFlush) + ) + .subscribe({ + next: flushAndResetCounter, + // As this stream is just trying to decide when to flush + // there's no data to lose, so in the case that an error + // is thrown, lets just flush + error: flushAndResetCounter, + }); + + return async function (entity: Input) { + return new Promise((resolve, reject) => { + storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject }); + }); + }; +} + +function resolveImmediate() { + return new Promise(setImmediate); +} + +function resolveIn(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} diff --git a/x-pack/plugins/task_manager/server/lib/result_type.test.ts b/x-pack/plugins/task_manager/server/lib/result_type.test.ts new file mode 100644 index 0000000000000..480a732f1f617 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/result_type.test.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid'; +import { unwrapPromise, asOk, asErr } from './result_type'; + +describe(`Result`, () => { + describe(`unwrapPromise`, () => { + test(`unwraps OKs from the result`, async () => { + const uniqueId = uuid.v4(); + expect(await unwrapPromise(Promise.resolve(asOk(uniqueId)))).toEqual(uniqueId); + }); + + test(`unwraps Errs from the result`, async () => { + const uniqueId = uuid.v4(); + expect(unwrapPromise(Promise.resolve(asErr(uniqueId)))).rejects.toEqual(uniqueId); + }); + + test(`unwraps Errs from the result when promise rejects`, async () => { + const uniqueId = uuid.v4(); + expect(unwrapPromise(Promise.reject(asErr(uniqueId)))).rejects.toEqual(uniqueId); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/result_type.ts b/x-pack/plugins/task_manager/server/lib/result_type.ts index edf4d84dd226d..d21c17d3bb5b3 100644 --- a/x-pack/plugins/task_manager/server/lib/result_type.ts +++ b/x-pack/plugins/task_manager/server/lib/result_type.ts @@ -47,6 +47,25 @@ export async function promiseResult(future: Promise): Promise(future: Promise>): Promise { + return future + .catch( + // catch rejection as we expect the result of the rejected promise + // to be wrapped in a Result - sadly there's no way to "Type" this + // requirment in Typescript as Promises do not enfore a type on their + // rejection + // The `then` will then unwrap the Result from around `ex` for us + (ex: Err) => ex + ) + .then((result: Result) => + map( + result, + (value: T) => Promise.resolve(value), + (err: E) => Promise.reject(err) + ) + ); +} + export function unwrap(result: Result): T | E { return isOk(result) ? result.value : result.error; } diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 23cb33cfac6c2..35ca439bb9130 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -57,6 +57,7 @@ import { } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; +import { BufferedTaskStore } from './buffered_task_store'; const VERSION_CONFLICT_STATUS = 409; @@ -90,7 +91,10 @@ export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRun */ export class TaskManager { private definitions: TaskDictionary = {}; + private store: TaskStore; + private bufferedStore: BufferedTaskStore; + private logger: Logger; private pool: TaskPool; // all task related events (task claimed, task marked as running, etc.) are emitted through events$ @@ -139,6 +143,10 @@ export class TaskManager { // pipe store events into the TaskManager's event stream this.store.events.subscribe((event) => this.events$.next(event)); + this.bufferedStore = new BufferedTaskStore(this.store, { + bufferMaxOperations: opts.config.max_workers, + }); + this.pool = new TaskPool({ logger: this.logger, maxWorkers: opts.config.max_workers, @@ -165,7 +173,7 @@ export class TaskManager { return new TaskManagerRunner({ logger: this.logger, instance, - store: this.store, + store: this.bufferedStore, definitions: this.definitions, beforeRun: this.middleware.beforeRun, beforeMarkRunning: this.middleware.beforeMarkRunning, diff --git a/x-pack/plugins/task_manager/server/task_runner.ts b/x-pack/plugins/task_manager/server/task_runner.ts index 4c690a5675f61..ebf13fac2f311 100644 --- a/x-pack/plugins/task_manager/server/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_runner.ts @@ -49,7 +49,7 @@ export interface TaskRunner { toString: () => string; } -interface Updatable { +export interface Updatable { readonly maxAttempts: number; update(doc: ConcreteTaskInstance): Promise; remove(id: string): Promise; diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts new file mode 100644 index 0000000000000..86db695bc5e2c --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskStore } from './task_store'; + +interface TaskStoreOptions { + maxAttempts?: number; + index?: string; + taskManagerId?: string; +} +export const taskStoreMock = { + create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) { + const mocked = ({ + update: jest.fn(), + remove: jest.fn(), + schedule: jest.fn(), + claimAvailableTasks: jest.fn(), + bulkUpdate: jest.fn(), + get: jest.fn(), + getLifecycle: jest.fn(), + fetch: jest.fn(), + maxAttempts, + index, + taskManagerId, + } as unknown) as jest.Mocked; + return mocked; + }, +}; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 4a691e17011e8..7ec3db5c99aa7 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -17,9 +17,10 @@ import { SavedObjectsSerializer, SavedObjectsRawDoc, ISavedObjectsRepository, + SavedObjectsUpdateResponse, } from '../../../../src/core/server'; -import { asOk, asErr } from './lib/result_type'; +import { asOk, asErr, Result } from './lib/result_type'; import { ConcreteTaskInstance, @@ -98,10 +99,10 @@ export interface ClaimOwnershipResult { docs: ConcreteTaskInstance[]; } -export interface BulkUpdateTaskFailureResult { - error: NonNullable; - task: ConcreteTaskInstance; -} +export type BulkUpdateResult = Result< + ConcreteTaskInstance, + { entity: ConcreteTaskInstance; error: Error } +>; export interface UpdateByQueryResult { updated: number; @@ -332,6 +333,54 @@ export class TaskStore { ); } + /** + * Updates the specified docs in the index, returning the docs + * with their versions up to date. + * + * @param {Array} docs + * @returns {Promise>} + */ + public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise { + const attributesByDocId = docs.reduce((attrsById, doc) => { + attrsById.set(doc.id, taskInstanceToAttributes(doc)); + return attrsById; + }, new Map()); + + const updatedSavedObjects: Array = ( + await this.savedObjectsRepository.bulkUpdate( + docs.map((doc) => ({ + type: 'task', + id: doc.id, + options: { version: doc.version }, + attributes: attributesByDocId.get(doc.id)!, + })), + { + refresh: false, + } + ) + ).saved_objects; + + return updatedSavedObjects.map((updatedSavedObject, index) => + isSavedObjectsUpdateResponse(updatedSavedObject) + ? asOk( + savedObjectToConcreteTaskInstance({ + ...updatedSavedObject, + attributes: defaults( + updatedSavedObject.attributes, + attributesByDocId.get(updatedSavedObject.id)! + ), + }) + ) + : asErr({ + // The SavedObjectsRepository maintains the order of the docs + // so we can rely on the index in the `docs` to match an error + // on the same index in the `bulkUpdate` result + entity: docs[index], + error: updatedSavedObject, + }) + ); + } + /** * Removes the specified task from the index. * @@ -468,3 +517,9 @@ function ensureQueryOnlyReturnsTaskObjects(opts: SearchOpts): SearchOpts { query, }; } + +function isSavedObjectsUpdateResponse( + result: SavedObjectsUpdateResponse | Error +): result is SavedObjectsUpdateResponse { + return result && typeof (result as SavedObjectsUpdateResponse).id === 'string'; +}