Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Addresses flaky test introduced by buffered store #72815

Merged
merged 3 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Err
});
}

describe.skip('Bulk Operation Buffer', () => {
describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
Expand All @@ -54,48 +54,6 @@ describe.skip('Bulk Operation Buffer', () => {
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
});

test('batch updates are executed at most by the next Event Loop tick by default', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();
const task6 = createTask();

return new Promise((resolve) => {
Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]);
});

setTimeout(() => {
// on next tick
setTimeout(() => {
// on next tick
expect(bulkUpdate).toHaveBeenCalledTimes(2);
Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]);
resolve();
});
}, 0);

expect(bulkUpdate).toHaveBeenCalledTimes(1);
Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
});
}, 0);
});
});

test('batch updates can be customised to execute after a certain period', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
Expand Down
11 changes: 2 additions & 9 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,8 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
// the race is started in response to the first operation into the buffer
// so we flush once the remaining operations come in (which is `bufferMaxOperations - 1`)
storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations - 1)),
bufferMaxDuration
? // if theres a max duration, flush buffer based on that
from(resolveIn(bufferMaxDuration))
: // ensure we flush by the end of the "current" event loop tick
from(resolveImmediate()),
// flush buffer once max duration has passed
from(resolveIn(bufferMaxDuration)),
]).pipe(first(), mapTo(FLUSH))
: from([DONT_FLUSH]);
}),
Expand All @@ -118,10 +115,6 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
};
}

function resolveImmediate() {
return new Promise(setImmediate);
}

function resolveIn(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
Expand Down