Skip to content

Commit

Permalink
[Task Manager] Addresses flaky test introduced by buffered store (#72815
Browse files Browse the repository at this point in the history
)

Removed unused functionality which we weren't using anyway and was causing some flaky behaviour.
  • Loading branch information
gmmorris authored Jul 22, 2020
1 parent cb0405e commit a41633d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Err
});
}

describe.skip('Bulk Operation Buffer', () => {
describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
Expand All @@ -54,48 +54,6 @@ describe.skip('Bulk Operation Buffer', () => {
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
});

test('batch updates are executed at most by the next Event Loop tick by default', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();
const task6 = createTask();

return new Promise((resolve) => {
Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]);
});

setTimeout(() => {
// on next tick
setTimeout(() => {
// on next tick
expect(bulkUpdate).toHaveBeenCalledTimes(2);
Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]);
resolve();
});
}, 0);

expect(bulkUpdate).toHaveBeenCalledTimes(1);
Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
});
}, 0);
});
});

test('batch updates can be customised to execute after a certain period', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
Expand Down
11 changes: 2 additions & 9 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,8 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
// the race is started in response to the first operation into the buffer
// so we flush once the remaining operations come in (which is `bufferMaxOperations - 1`)
storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations - 1)),
bufferMaxDuration
? // if theres a max duration, flush buffer based on that
from(resolveIn(bufferMaxDuration))
: // ensure we flush by the end of the "current" event loop tick
from(resolveImmediate()),
// flush buffer once max duration has passed
from(resolveIn(bufferMaxDuration)),
]).pipe(first(), mapTo(FLUSH))
: from([DONT_FLUSH]);
}),
Expand All @@ -118,10 +115,6 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
};
}

function resolveImmediate() {
return new Promise(setImmediate);
}

function resolveIn(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
Expand Down

0 comments on commit a41633d

Please sign in to comment.