Skip to content

Commit

Permalink
Refactor TM to update the tasks that has state validation error (elas…
Browse files Browse the repository at this point in the history
…tic#176415)

Resolves: elastic#172605

This PR makes TM TaskRunner to handle state validation errors gracefully
to allow it update the task state.

## To verify:

1 - Create a rule with some actions.
2- Throw an error in [state validation
function](https://github.com/elastic/kibana/pull/176415/files#diff-ae4166cd6b3509473867eaed0e7b974a15b9c0268225131aef1b00d61e800e89R428)
to force it to return an error.
3- Expect the rule tasks to run and update the task state successfully
rather than throwing an error and preventing task update.
  • Loading branch information
ersin-erdal authored Feb 9, 2024
1 parent 44df1f4 commit e20e659
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ describe('task state validation', () => {

it('should fail the task run when setting allow_reading_invalid_state:false and reading an invalid state', async () => {
const logSpy = jest.spyOn(pollingLifecycleOpts.logger, 'warn');
const updateSpy = jest.spyOn(pollingLifecycleOpts.taskStore, 'bulkUpdate');

const id = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
Expand All @@ -331,8 +332,9 @@ describe('task state validation', () => {
expect(logSpy.mock.calls[0][0]).toBe(
`Task (fooType/${id}) has a validation error: [foo]: expected value of type [string] but got [boolean]`
);
expect(logSpy.mock.calls[1][0]).toBe(
`Task fooType \"${id}\" failed in attempt to run: [foo]: expected value of type [string] but got [boolean]`
expect(updateSpy).toHaveBeenCalledWith(
expect.arrayContaining([expect.objectContaining({ id, taskType: 'fooType' })]),
{ validate: false }
);
});
});
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export type SuccessfulRunResult = {
state: Record<string, unknown>;
taskRunError?: DecoratedError;
skipAttempts?: number;
shouldValidate?: boolean;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ describe('TaskManagerRunner', () => {
await runner.run();

expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.any(Object), { validate: true });
const instance = store.update.mock.calls[0][0];

expect(instance.runAt.getTime()).toEqual(nextRetry.getTime());
Expand Down Expand Up @@ -1113,6 +1114,8 @@ describe('TaskManagerRunner', () => {
await runner.run();

expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.any(Object), { validate: true });

const instance = store.update.mock.calls[0][0];

const minRunAt = Date.now();
Expand Down Expand Up @@ -1179,6 +1182,8 @@ describe('TaskManagerRunner', () => {
await runner.run();

expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.any(Object), { validate: true });

sinon.assert.notCalled(getRetryStub);
const instance = store.update.mock.calls[0][0];

Expand Down Expand Up @@ -1252,6 +1257,7 @@ describe('TaskManagerRunner', () => {
new Date(Date.now() + intervalSeconds * 1000).getTime()
);
expect(instance.enabled).not.toBeDefined();
expect(store.update).toHaveBeenCalledWith(expect.any(Object), { validate: true });
});

test('throws error when the task has invalid state', async () => {
Expand All @@ -1266,7 +1272,7 @@ describe('TaskManagerRunner', () => {
stateVersion: 4,
};

const { runner, logger } = await readyToRunStageSetup({
const { runner, logger, store } = await readyToRunStageSetup({
instance: mockTaskInstance,
definitions: {
bar: {
Expand Down Expand Up @@ -1308,13 +1314,19 @@ describe('TaskManagerRunner', () => {
},
});

expect(() => runner.run()).rejects.toMatchInlineSnapshot(
`[Error: [foo]: expected value of type [string] but got [boolean]]`
);
expect(await runner.run()).toEqual({
error: {
error: new Error('[foo]: expected value of type [string] but got [boolean]'),
shouldValidate: false,
state: { bar: 'test', baz: 'test', foo: true },
},
tag: 'err',
});
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error: [foo]: expected value of type [string] but got [boolean]'
);
expect(store.update).toHaveBeenCalledWith(expect.any(Object), { validate: false });
});

test('does not throw error and runs when the task has invalid state and allowReadingInvalidState = true', async () => {
Expand Down
32 changes: 24 additions & 8 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,30 @@ export class TaskManagerRunner implements TaskRunner {
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);

// Validate state
const validatedTaskInstance = this.validateTaskState(this.instance.task);
const stateValidationResult = this.validateTaskState(this.instance.task);

if (stateValidationResult.error) {
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(
asErr({
error: stateValidationResult.error,
state: stateValidationResult.taskInstance.state,
shouldValidate: false,
}),
stopTaskTimer()
)
);
if (apmTrans) apmTrans.end('failure');
return processedResult;
}

const modifiedContext = await this.beforeRun({
taskInstance: validatedTaskInstance,
taskInstance: stateValidationResult.taskInstance,
});

const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);

this.onTaskEvent(
asTaskManagerStatEvent(
'runDelay',
Expand Down Expand Up @@ -411,11 +425,12 @@ export class TaskManagerRunner implements TaskRunner {
private validateTaskState(taskInstance: ConcreteTaskInstance) {
const { taskType, id } = taskInstance;
try {
const validatedTask = this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance);
return validatedTask;
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance);
return { taskInstance: validatedTaskInstance, error: null };
} catch (error) {
this.logger.warn(`Task (${taskType}/${id}) has a validation error: ${error.message}`);
throw error;
return { taskInstance, error };
}
}

Expand Down Expand Up @@ -723,6 +738,7 @@ export class TaskManagerRunner implements TaskRunner {
this.instance = asRan(this.instance.task);
await this.removeTask();
} else {
const { shouldValidate = true } = unwrap(result);
this.instance = asRan(
await this.bufferedTaskStore.update(
defaults(
Expand All @@ -735,7 +751,7 @@ export class TaskManagerRunner implements TaskRunner {
},
taskWithoutEnabled(this.instance.task)
),
{ validate: true }
{ validate: shouldValidate }
)
);
}
Expand Down

0 comments on commit e20e659

Please sign in to comment.