Skip to content

Commit

Permalink
Adding tests and updating ConcreteTaskInstance interface with tracepa…
Browse files Browse the repository at this point in the history
…rent
  • Loading branch information
ymao1 committed May 25, 2021
1 parent 419a10b commit 0be04d6
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ describe('taskRunner', () => {
ownerId: '234',
taskType: 'foo',
params: {},
traceparent: '',
};

beforeEach(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ beforeAll(() => {
actionTaskParamsId: '3',
},
taskType: 'actions:1',
traceparent: '',
};
taskRunnerFactory = new TaskRunnerFactory(mockedActionExecutor);
mockedActionExecutor.initialize(actionExecutorInitializerParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ function mockTask() {
user: undefined,
version: '123',
ownerId: '123',
traceparent: '',
};
}
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ describe('fillPool', () => {
taskType: '',
params: {},
ownerId: null,
traceparent: '',
}));

test('fills task pool with all claimed tasks until fetchAvailableTasks stream closes', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ const mockTaskInstance = (overrides: Partial<ConcreteTaskInstance> = {}): Concre
alertId: '1',
},
ownerId: null,
traceparent: '',
...overrides,
});

Expand Down
74 changes: 73 additions & 1 deletion x-pack/plugins/task_manager/server/queries/task_claiming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import { mockLogger } from '../test_utils';
import { TaskClaiming, OwnershipClaimingOpts, TaskClaimingOpts } from './task_claiming';
import { Observable } from 'rxjs';
import { taskStoreMock } from '../task_store.mock';
import apm from 'elastic-apm-node';

const taskManagerLogger = mockLogger();

beforeEach(() => jest.resetAllMocks());
beforeEach(() => jest.clearAllMocks());

const mockedDate = new Date('2019-02-12T21:01:22.479Z');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -52,7 +53,19 @@ taskDefinitions.registerTaskDefinitions({
},
});

const mockApmTrans = {
end: jest.fn(),
};

describe('TaskClaiming', () => {
beforeEach(() => {
jest.clearAllMocks();
jest
.spyOn(apm, 'startTransaction')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.mockImplementation(() => mockApmTrans as any);
});

test(`should log when a certain task type is skipped due to having a zero concurency configuration`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
Expand Down Expand Up @@ -169,6 +182,12 @@ describe('TaskClaiming', () => {

const results = await getAllAsPromise(taskClaiming.claimAvailableTasks(claimingOpts));

expect(apm.startTransaction).toHaveBeenCalledWith(
'markAvailableTasksAsClaimed',
'taskManager markAvailableTasksAsClaimed'
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(store.updateByQuery.mock.calls[0][1]).toMatchObject({
max_docs: getCapacity(),
});
Expand All @@ -187,6 +206,49 @@ describe('TaskClaiming', () => {
}));
}

test('makes calls to APM as expected when markAvailableTasksAsClaimed throws error', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);

const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});

const { taskClaiming, store } = initialiseTestClaiming({
storeOpts: {
definitions,
},
taskClaimingOpts: {
maxAttempts,
},
});

store.updateByQuery.mockRejectedValue(new Error('Oh no'));

await expect(
getAllAsPromise(
taskClaiming.claimAvailableTasks({
claimOwnershipUntil: new Date(),
})
)
).rejects.toMatchInlineSnapshot(`[Error: Oh no]`);

expect(apm.startTransaction).toHaveBeenCalledWith(
'markAvailableTasksAsClaimed',
'taskManager markAvailableTasksAsClaimed'
);
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
});

test('it filters claimed tasks down by supported types, maxAttempts, status, and runAt', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
Expand Down Expand Up @@ -1105,6 +1167,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'parent',
},
{
id: 'claimed-by-schedule',
Expand All @@ -1121,6 +1184,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'newParent',
},
{
id: 'already-running',
Expand All @@ -1137,6 +1201,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
},
];

Expand Down Expand Up @@ -1222,6 +1287,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'parent',
})
)
);
Expand Down Expand Up @@ -1277,6 +1343,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
},
],
// second cycle
Expand All @@ -1296,6 +1363,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
},
],
],
Expand Down Expand Up @@ -1347,6 +1415,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
}),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY,
})
Expand Down Expand Up @@ -1393,6 +1462,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'newParent',
})
)
);
Expand Down Expand Up @@ -1437,6 +1507,7 @@ if (doc['task.runAt'].size()!=0) {
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
}),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS,
})
Expand Down Expand Up @@ -1499,6 +1570,7 @@ function mockInstance(instance: Partial<ConcreteTaskInstance> = {}) {
status: 'idle',
user: 'example',
ownerId: null,
traceparent: '',
},
instance
);
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ export interface TaskInstance {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>;

/**
* The serialized traceparent string of the current APM transaction or span.
*/
traceparent?: string;

/**
Expand Down Expand Up @@ -358,6 +361,11 @@ export interface ConcreteTaskInstance extends TaskInstance {
* The random uuid of the Kibana instance which claimed ownership of the task last
*/
ownerId: string | null;

/**
* The serialized traceparent string of the current APM transaction or span.
*/
traceparent: string;
}

export type SerializedConcreteTaskInstance = Omit<
Expand Down
113 changes: 113 additions & 0 deletions x-pack/plugins/task_manager/server/task_running/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_diction
import { mockLogger } from '../test_utils';
import { throwUnrecoverableError } from './errors';
import { taskStoreMock } from '../task_store.mock';
import apm from 'elastic-apm-node';

const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);

Expand All @@ -32,8 +33,70 @@ afterAll(() => fakeTimer.restore());
describe('TaskManagerRunner', () => {
const pendingStageSetup = (opts: TestOpts) => testOpts(TaskRunningStage.PENDING, opts);
const readyToRunStageSetup = (opts: TestOpts) => testOpts(TaskRunningStage.READY_TO_RUN, opts);
const mockApmTrans = {
end: jest.fn(),
};

describe('Pending Stage', () => {
beforeEach(() => {
jest.clearAllMocks();
jest
.spyOn(apm, 'startTransaction')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.mockImplementation(() => mockApmTrans as any);
});
test('makes calls to APM as expected when task markedAsRunning is success', async () => {
const { runner } = await pendingStageSetup({
instance: {
schedule: {
interval: '10m',
},
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
await runner.markTaskAsRunning();
expect(apm.startTransaction).toHaveBeenCalledWith(
'taskManager',
'taskManager markTaskAsRunning'
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
});
test('makes calls to APM as expected when task markedAsRunning fails', async () => {
const { runner, store } = await pendingStageSetup({
instance: {
schedule: {
interval: '10m',
},
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
store.update.mockRejectedValue(
SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id')
);
await expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(
`[Error: Saved object [type/id] not found]`
);
// await runner.markTaskAsRunning();
expect(apm.startTransaction).toHaveBeenCalledWith(
'taskManager',
'taskManager markTaskAsRunning'
);
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
});
test('provides details about the task that is running', async () => {
const { runner } = await pendingStageSetup({
instance: {
Expand Down Expand Up @@ -572,6 +635,55 @@ describe('TaskManagerRunner', () => {
});

describe('Ready To Run Stage', () => {
beforeEach(() => {
jest.clearAllMocks();
});
test('makes calls to APM as expected when task runs successfully', async () => {
const { runner } = await readyToRunStageSetup({
instance: {
params: { a: 'b' },
state: { hey: 'there' },
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { state: {} };
},
}),
},
},
});
await runner.run();
expect(apm.startTransaction).toHaveBeenCalledWith('bar', 'taskManager run', {
childOf: 'apmTraceparent',
});
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
});
test('makes calls to APM as expected when task fails', async () => {
const { runner } = await readyToRunStageSetup({
instance: {
params: { a: 'b' },
state: { hey: 'there' },
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
throw new Error('rar');
},
}),
},
},
});
await runner.run();
expect(apm.startTransaction).toHaveBeenCalledWith('bar', 'taskManager run', {
childOf: 'apmTraceparent',
});
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
});
test('queues a reattempt if the task fails', async () => {
const initialAttempts = _.random(0, 2);
const id = Date.now().toString();
Expand Down Expand Up @@ -1275,6 +1387,7 @@ describe('TaskManagerRunner', () => {
status: 'idle',
user: 'example',
ownerId: null,
traceparent: 'apmTraceparent',
},
instance
);
Expand Down
Loading

0 comments on commit 0be04d6

Please sign in to comment.