Skip to content

Commit

Permalink
[Response Ops][Task Manager] Setting task status directly to `running…
Browse files Browse the repository at this point in the history
…` in `mget` claim strategy (elastic#191669)

Resolves elastic#184739

## Summary

During the `mget` task claim strategy, we set the task directly to
`running` with the appropriate `retryAt` value instead of setting it to
`claiming` and letting the task runner set the task to `running`. This
removes a task update from the claiming process. Updates the task runner
to skip the `markTaskAsRunning` function if the claim strategy is `mget`
and move the task directly to "ready to run".

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
ymao1 and elasticmachine authored Sep 3, 2024
1 parent c9c9054 commit ba0485e
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 166 deletions.
89 changes: 89 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_retry_at.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import sinon from 'sinon';
import { calculateDelayBasedOnAttempts, getRetryDate } from './get_retry_at';
import { createRetryableError } from '../task_running';

let fakeTimer: sinon.SinonFakeTimers;

describe('calculateDelayBasedOnAttempts', () => {
it('returns 30s on the first attempt', () => {
expect(calculateDelayBasedOnAttempts(1)).toBe(30000);
});

it('returns delay with jitter', () => {
const delay = calculateDelayBasedOnAttempts(5);
// with jitter should be random between 0 and 40 min (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(2400000);
});

it('returns delay capped at 1 hour', () => {
const delay = calculateDelayBasedOnAttempts(10);
// with jitter should be random between 0 and 1 hr (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(60 * 60 * 1000);
});
});

describe('getRetryDate', () => {
beforeAll(() => {
fakeTimer = sinon.useFakeTimers(new Date('2021-01-01T12:00:00.000Z'));
});

afterAll(() => fakeTimer.restore());

it('returns retry date based on number of attempts if error is not retryable', () => {
expect(getRetryDate({ error: new Error('foo'), attempts: 1 })).toEqual(
new Date('2021-01-01T12:00:30.000Z')
);
});

it('returns retry date based on number of attempts and add duration if error is not retryable', () => {
expect(getRetryDate({ error: new Error('foo'), attempts: 1, addDuration: '5m' })).toEqual(
new Date('2021-01-01T12:05:30.000Z')
);
});

it('returns retry date for retryable error with retry date', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), new Date('2021-02-01T12:00:00.000Z')),
attempts: 1,
})
).toEqual(new Date('2021-02-01T12:00:00.000Z'));
});

it('returns retry date based on number of attempts for retryable error with retry=true', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), true),
attempts: 1,
})
).toEqual(new Date('2021-01-01T12:00:30.000Z'));
});

it('returns retry date based on number of attempts and add duration for retryable error with retry=true', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), true),
attempts: 1,
addDuration: '5m',
})
).toEqual(new Date('2021-01-01T12:05:30.000Z'));
});

it('returns undefined for retryable error with retry=false', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), false),
attempts: 1,
})
).toBeUndefined();
});
});
79 changes: 79 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_retry_at.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { random } from 'lodash';
import { ConcreteTaskInstance, DEFAULT_TIMEOUT, TaskDefinition } from '../task';
import { isRetryableError } from '../task_running';
import { intervalFromDate, maxIntervalFromDate } from './intervals';

export function getRetryAt(
task: ConcreteTaskInstance,
taskDefinition: TaskDefinition | undefined
): Date | undefined {
const taskTimeout = getTimeout(task, taskDefinition);
if (task.schedule) {
return maxIntervalFromDate(new Date(), task.schedule.interval, taskTimeout);
}

return getRetryDate({
attempts: task.attempts + 1,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: taskTimeout,
});
}

export function getRetryDate({
error,
attempts,
addDuration,
}: {
error: Error;
attempts: number;
addDuration?: string;
}): Date | undefined {
const retry: boolean | Date = isRetryableError(error) ?? true;

let result;
if (retry instanceof Date) {
result = retry;
} else if (retry === true) {
result = new Date(Date.now() + calculateDelayBasedOnAttempts(attempts));
}

// Add a duration to the result
if (addDuration && result) {
result = intervalFromDate(result, addDuration)!;
}
return result;
}

export function calculateDelayBasedOnAttempts(attempts: number) {
// Return 30s for the first retry attempt
if (attempts === 1) {
return 30 * 1000;
} else {
const defaultBackoffPerFailure = 5 * 60 * 1000;
const maxDelay = 60 * 60 * 1000;
// For each remaining attempt return an exponential delay with jitter that is capped at 1 hour.
// We adjust the attempts by 2 to ensure that delay starts at 5m for the second retry attempt
// and increases exponentially from there.
return random(Math.min(maxDelay, defaultBackoffPerFailure * Math.pow(2, attempts - 2)));
}
}

export function getTimeout(
task: ConcreteTaskInstance,
taskDefinition: TaskDefinition | undefined
): string {
if (task.schedule) {
return taskDefinition?.timeout ?? DEFAULT_TIMEOUT;
}

return task.timeoutOverride ? task.timeoutOverride : taskDefinition?.timeout ?? DEFAULT_TIMEOUT;
}
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
});
};

Expand Down
Loading

0 comments on commit ba0485e

Please sign in to comment.