Skip to content

Commit

Permalink
[Alerting] Handle when an Alerting Task fails due to its Alert object…
Browse files Browse the repository at this point in the history
… being deleted mid flight (#63093) (#63569)

Detects if a task run failed due to the task SO being deleted mid flight and if so writes debug logs instead of warnings.

Detects if an Alerting task run failed due to the alert SO being deleted mid flight of the task and if so ensures the task doesn't reschedule itself (as it usually would with other types of tasks).

Ensures that the operation of deleting or disabling an Alert won't fail if it fails to delete an already deleted task (a task might preemptively self delete if its underlying alert object was deleted, even if the overall delete operation wasn't deleted).

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
gmmorris and elasticmachine authored Apr 17, 2020
1 parent b9822b1 commit 3cdf259
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 21 deletions.
7 changes: 5 additions & 2 deletions x-pack/plugins/alerting/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
import { EncryptedSavedObjectsPluginStart } from '../../../plugins/encrypted_saved_objects/server';
import { TaskManagerStartContract } from '../../../plugins/task_manager/server';
import { taskInstanceToAlertTaskInstance } from './task_runner/alert_task_instance';
import { deleteTaskIfItExists } from './lib/delete_task_if_it_exists';

type NormalizedAlertAction = Omit<AlertAction, 'actionTypeId'>;
export type CreateAPIKeyResult =
Expand Down Expand Up @@ -263,7 +264,7 @@ export class AlertsClient {
const removeResult = await this.savedObjectsClient.delete('alert', id);

await Promise.all([
taskIdToRemove ? this.taskManager.remove(taskIdToRemove) : null,
taskIdToRemove ? deleteTaskIfItExists(this.taskManager, taskIdToRemove) : null,
apiKeyToInvalidate ? this.invalidateApiKey({ apiKey: apiKeyToInvalidate }) : null,
]);

Expand Down Expand Up @@ -505,7 +506,9 @@ export class AlertsClient {
);

await Promise.all([
attributes.scheduledTaskId ? this.taskManager.remove(attributes.scheduledTaskId) : null,
attributes.scheduledTaskId
? deleteTaskIfItExists(this.taskManager, attributes.scheduledTaskId)
: null,
apiKeyToInvalidate ? this.invalidateApiKey({ apiKey: apiKeyToInvalidate }) : null,
]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { deleteTaskIfItExists } from './delete_task_if_it_exists';

describe('deleteTaskIfItExists', () => {
test('removes the task by its ID', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);

expect(tm.remove).toHaveBeenCalledWith(id);
});

test('handles 404 errors caused by the task not existing', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

tm.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id));

expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);

expect(tm.remove).toHaveBeenCalledWith(id);
});

test('throws if any other errro is caused by task removal', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuid.v4());
tm.remove.mockRejectedValue(error);

expect(deleteTaskIfItExists(tm, id)).rejects.toBe(error);

expect(tm.remove).toHaveBeenCalledWith(id);
});
});
17 changes: 17 additions & 0 deletions x-pack/plugins/alerting/server/lib/delete_task_if_it_exists.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerStartContract } from '../../../task_manager/server';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export async function deleteTaskIfItExists(taskManager: TaskManagerStartContract, taskId: string) {
try {
await taskManager.remove(taskId);
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
throw err;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { isAlertSavedObjectNotFoundError } from './is_alert_not_found_error';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import uuid from 'uuid';

describe('isAlertSavedObjectNotFoundError', () => {
test('identifies SavedObjects Not Found errors', () => {
const id = uuid.v4();
// ensure the error created by SO parses as a string with the format we expect
expect(
`${SavedObjectsErrorHelpers.createGenericNotFoundError('alert', id)}`.includes(`alert/${id}`)
).toBe(true);

const errorBySavedObjectsHelper = SavedObjectsErrorHelpers.createGenericNotFoundError(
'alert',
id
);

expect(isAlertSavedObjectNotFoundError(errorBySavedObjectsHelper, id)).toBe(true);
});

test('identifies generic errors', () => {
const id = uuid.v4();
expect(isAlertSavedObjectNotFoundError(new Error(`not found`), id)).toBe(false);
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/alerting/server/lib/is_alert_not_found_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export function isAlertSavedObjectNotFoundError(err: Error, alertId: string) {
return SavedObjectsErrorHelpers.isNotFoundError(err) && `${err}`.includes(alertId);
}
33 changes: 33 additions & 0 deletions x-pack/plugins/alerting/server/task_runner/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { encryptedSavedObjectsMock } from '../../../../plugins/encrypted_saved_o
import { savedObjectsClientMock, loggingServiceMock } from '../../../../../src/core/server/mocks';
import { PluginStartContract as ActionsPluginStart } from '../../../actions/server';
import { actionsMock } from '../../../actions/server/mocks';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

const alertType = {
id: 'test',
Expand Down Expand Up @@ -501,4 +502,36 @@ describe('Task Runner', () => {
}
`);
});

test('avoids rescheduling a failed Alert Task Runner when it throws due to failing to fetch the alert', async () => {
savedObjectsClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', '1');
});

const taskRunner = new TaskRunner(
alertType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams
);

encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"runAt": undefined,
"state": Object {
"previousStartedAt": 1970-01-01T00:00:00.000Z,
},
}
`);
});
});
30 changes: 19 additions & 11 deletions x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import {
import { promiseResult, map, Resultable, asOk, asErr, resolveErr } from '../lib/result_type';
import { taskInstanceToAlertTaskInstance } from './alert_task_instance';
import { AlertInstances } from '../alert_instance/alert_instance';
import { isAlertSavedObjectNotFoundError } from '../lib/is_alert_not_found_error';

const FALLBACK_RETRY_INTERVAL: IntervalSchedule = { interval: '5m' };

interface AlertTaskRunResult {
state: AlertTaskState;
runAt: Date;
runAt: Date | undefined;
}

interface AlertTaskInstance extends ConcreteTaskInstance {
Expand Down Expand Up @@ -293,22 +294,29 @@ export class TaskRunner {
};
},
(err: Error) => {
this.logger.error(`Executing Alert "${alertId}" has resulted in Error: ${err.message}`);
const message = `Executing Alert "${alertId}" has resulted in Error: ${err.message}`;
if (isAlertSavedObjectNotFoundError(err, alertId)) {
this.logger.debug(message);
} else {
this.logger.error(message);
}
return {
...originalState,
previousStartedAt,
};
}
),
runAt: resolveErr<Date, Error>(runAt, () =>
getNextRunAt(
new Date(),
// if we fail at this point we wish to recover but don't have access to the Alert's
// attributes, so we'll use a default interval to prevent the underlying task from
// falling into a failed state
FALLBACK_RETRY_INTERVAL
)
),
runAt: resolveErr<Date | undefined, Error>(runAt, err => {
return isAlertSavedObjectNotFoundError(err, alertId)
? undefined
: getNextRunAt(
new Date(),
// if we fail at this point we wish to recover but don't have access to the Alert's
// attributes, so we'll use a default interval to prevent the underlying task from
// falling into a failed state
FALLBACK_RETRY_INTERVAL
);
}),
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

import moment from 'moment';
import {
getMockCallWithInternal,
getMockConfig,
Expand All @@ -13,6 +12,7 @@ import {
} from '../../../test_utils';
import { visualizationsTaskRunner } from './task_runner';
import { TaskInstance } from '../../../../../task_manager/server';
import { getNextMidnight } from '../../get_next_midnight';

describe('visualizationsTaskRunner', () => {
let mockTaskInstance: TaskInstance;
Expand Down Expand Up @@ -41,12 +41,6 @@ describe('visualizationsTaskRunner', () => {
});

test('Summarizes visualization response data', async () => {
const getNextMidnight = () =>
moment()
.add(1, 'days')
.startOf('day')
.toDate();

const runner = visualizationsTaskRunner(mockTaskInstance, getMockConfig(), getMockEs());
const result = await runner();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { isTaskSavedObjectNotFoundError } from './is_task_not_found_error';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import uuid from 'uuid';

describe('isTaskSavedObjectNotFoundError', () => {
test('identifies SavedObjects Not Found errors', () => {
const id = uuid.v4();
// ensure the error created by SO parses as a string with the format we expect
expect(
`${SavedObjectsErrorHelpers.createGenericNotFoundError('task', id)}`.includes(`task/${id}`)
).toBe(true);

const errorBySavedObjectsHelper = SavedObjectsErrorHelpers.createGenericNotFoundError(
'task',
id
);

expect(isTaskSavedObjectNotFoundError(errorBySavedObjectsHelper, id)).toBe(true);
});

test('identifies generic errors', () => {
const id = uuid.v4();
expect(isTaskSavedObjectNotFoundError(new Error(`not found`), id)).toBe(false);
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/task_manager/server/lib/is_task_not_found_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export function isTaskSavedObjectNotFoundError(err: Error, taskId: string) {
return SavedObjectsErrorHelpers.isNotFoundError(err) && `${err}`.includes(taskId);
}
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import sinon from 'sinon';
import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
import { asOk } from './lib/result_type';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';

describe('TaskPool', () => {
test('occupiedWorkers are a sum of running tasks', async () => {
Expand Down Expand Up @@ -101,6 +102,30 @@ describe('TaskPool', () => {
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});

test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 3,
logger,
});

const taskFailedToRun = mockTask();
taskFailedToRun.run.mockImplementation(async () => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', taskFailedToRun.id);
});

const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]);

expect(logger.debug.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"Task TaskType \\"shooooo\\" failed in attempt to run: Saved object [task/foo] not found",
]
`);
expect(logger.warn).not.toHaveBeenCalled();

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});

test('Running a task which fails still takes up capacity', async () => {
const logger = mockLogger();
const pool = new TaskPool({
Expand Down
13 changes: 12 additions & 1 deletion x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import { performance } from 'perf_hooks';
import { Logger } from './types';
import { TaskRunner } from './task_runner';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';

interface Opts {
maxWorkers: number;
Expand Down Expand Up @@ -125,7 +126,17 @@ export class TaskPool {
taskRunner
.run()
.catch(err => {
this.logger.warn(`Task ${taskRunner.toString()} failed in attempt to run: ${err.message}`);
// If a task Saved Object can't be found by an in flight task runner
// we asssume the underlying task has been deleted while it was running
// so we will log this as a debug, rather than a warn
const errorLogLine = `Task ${taskRunner.toString()} failed in attempt to run: ${
err.message
}`;
if (isTaskSavedObjectNotFoundError(err, taskRunner.id)) {
this.logger.debug(errorLogLine);
} else {
this.logger.warn(errorLogLine);
}
})
.then(() => this.running.delete(taskRunner));
}
Expand Down

0 comments on commit 3cdf259

Please sign in to comment.