Skip to content

Commit

Permalink
Make mget task claim strategy poll more frequently (elastic#190059)
Browse files Browse the repository at this point in the history
In this PR, I'm changing the default `poll_interval` to `500` whenever
mget claim strategy is set. This keeps the `3000` value for the default
task claiming strategy and setting `xpack.task_manager.poll_interval`
still takes precedence.

I'm increasing the frequency because we no longer get as much contention
when adding Kibana nodes, allowing tasks to be picked up more frequently
/ on-time.

## To verify

1. Add a console log to the task manager plugin constructor (like
elastic@1e6b1ac)
2. Startup this PR without any task manager yml settings
3. Observe `3000` is the poll interval
4. Set `xpack.task_manager.claim_strategy: 'unsafe_mget'` in the
kibana.yml file and wait for Kibana restart
5. Observe `500` is the poll interval
6. Set `xpack.task_manager.poll_interval: 3001` and wait for Kibana
restart
7. Observe `3001` is the poll interval
8. Remove `xpack.task_manager.claim_strategy` setting and wait for
Kibana restart
9. Observe `3001` is still poll interval
10. Remove `xpack.task_manager.poll_interval` setting

Flaky test runner for mget tests:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/6722

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
mikecote and elasticmachine authored Aug 9, 2024
1 parent 813025c commit 0f42e78
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
12 changes: 11 additions & 1 deletion x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { configSchema } from './config';
import { configSchema, CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from './config';

describe('config validation', () => {
test('task manager defaults', () => {
Expand Down Expand Up @@ -242,4 +242,14 @@ describe('config validation', () => {
test('any claim strategy is valid', () => {
configSchema.validate({ claim_strategy: 'anything!' });
});

test('default claim strategy defaults poll interval to 3000ms', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_DEFAULT });
expect(result.poll_interval).toEqual(3000);
});

test('mget claim strategy defaults poll interval to 500ms', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_MGET });
expect(result.poll_interval).toEqual(500);
});
});
17 changes: 13 additions & 4 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const MAX_CAPACITY = 50;
export const MIN_CAPACITY = 5;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const MGET_DEFAULT_POLL_INTERVAL = 500;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT;

Expand Down Expand Up @@ -133,10 +134,18 @@ export const configSchema = schema.object(
default: taskExecutionFailureThresholdSchema,
}),
/* How often, in milliseconds, the task manager will look for more work. */
poll_interval: schema.number({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
poll_interval: schema.conditional(
schema.siblingRef('claim_strategy'),
CLAIM_STRATEGY_MGET,
schema.number({
defaultValue: MGET_DEFAULT_POLL_INTERVAL,
min: 100,
}),
schema.number({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
})
),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export default function ({ getService }: FtrProviderContext) {
const health = await getHealth();
expect(health.status).to.eql('OK');
expect(health.stats.configuration.value).to.eql({
poll_interval: 3000,
poll_interval: 500,
monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export default function ({ getService }: FtrProviderContext) {
// counters are reset every 30 seconds, so wait until the start of a
// fresh counter cycle to make sure values are incrementing
const initialMetrics = (
await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1)
await getMetrics(
false,
(metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= 1
)
).metrics;
expect(initialMetrics).not.to.be(null);
expect(initialMetrics?.task_claim).not.to.be(null);
Expand Down Expand Up @@ -92,7 +95,7 @@ export default function ({ getService }: FtrProviderContext) {
const initialMetrics = (
await getMetrics(
false,
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
(metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue
)
).metrics;
expect(initialMetrics).not.to.be(null);
Expand All @@ -101,7 +104,10 @@ export default function ({ getService }: FtrProviderContext) {

// retry until counter value resets
const resetMetrics = (
await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1)
await getMetrics(
false,
(m: NodeMetrics) => (m?.metrics?.task_claim?.value.total || 0) >= 1
)
).metrics;
expect(resetMetrics).not.to.be(null);
expect(resetMetrics?.task_claim).not.to.be(null);
Expand All @@ -113,7 +119,7 @@ export default function ({ getService }: FtrProviderContext) {
const initialMetrics = (
await getMetrics(
false,
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
(metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue
)
).metrics;
expect(initialMetrics).not.to.be(null);
Expand All @@ -133,8 +139,8 @@ export default function ({ getService }: FtrProviderContext) {
expect(metrics?.task_claim).not.to.be(null);
expect(metrics?.task_claim?.value).not.to.be(null);

expect(metrics?.task_claim?.value.success).to.equal(1);
expect(metrics?.task_claim?.value.total).to.equal(1);
expect(metrics?.task_claim?.value.success).to.be.greaterThan(0);
expect(metrics?.task_claim?.value.total).to.be.greaterThan(0);

previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;

Expand Down

0 comments on commit 0f42e78

Please sign in to comment.