Skip to content

Commit

Permalink
[Task Manager] Batches the update operations in Task Manager (elastic…
Browse files Browse the repository at this point in the history
…#71470)

This PR attempts to batch update tasks in Task Manager in order to avoid overloading the Elasticsearch queue.
This is the 1st PR addressing elastic#65551

Under the hood we now use a Reactive buffer accumulates all calls to the `update` api in the TaskStore and flushes after 50ms or when as many operations as there are workers have been buffered (whichever comes first).
  • Loading branch information
gmmorris committed Jul 21, 2020
1 parent 9cf319d commit a342260
Show file tree
Hide file tree
Showing 10 changed files with 685 additions and 7 deletions.
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore, {});

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = mockTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const tasks = [mockTask(), mockTask(), mockTask()];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
});
});
});

function mockTask() {
return {
id: `task_${uuid.v4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}
39 changes: 39 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer';
import { unwrapPromise } from './lib/result_type';

// by default allow updates to be buffered for up to 50ms
const DEFAULT_BUFFER_MAX_DURATION = 50;

export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(
(docs) => taskStore.bulkUpdate(docs),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}

public async remove(id: string): Promise<void> {
return this.taskStore.remove(id);
}
}
Loading

0 comments on commit a342260

Please sign in to comment.