From 0f42e78867446b035e9758eaf8b432ab49519780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Fri, 9 Aug 2024 09:32:59 -0400 Subject: [PATCH] Make mget task claim strategy poll more frequently (#190059) In this PR, I'm changing the default `poll_interval` to `500` whenever mget claim strategy is set. This keeps the `3000` value for the default task claiming strategy and setting `xpack.task_manager.poll_interval` still takes precedence. I'm increasing the frequency because we no longer get as much contention when adding Kibana nodes, allowing tasks to be picked up more frequently / on-time. ## To verify 1. Add a console log to the task manager plugin constructor (like https://github.com/elastic/kibana/pull/190059/commits/1e6b1ac0ec1fc739e3ae63ea577be8ea96d89454) 2. Startup this PR without any task manager yml settings 3. Observe `3000` is the poll interval 4. Set `xpack.task_manager.claim_strategy: 'unsafe_mget'` in the kibana.yml file and wait for Kibana restart 5. Observe `500` is the poll interval 6. Set `xpack.task_manager.poll_interval: 3001` and wait for Kibana restart 7. Observe `3001` is the poll interval 8. Remove `xpack.task_manager.claim_strategy` setting and wait for Kibana restart 9. Observe `3001` is still poll interval 10. Remove `xpack.task_manager.poll_interval` setting Flaky test runner for mget tests: https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/6722 --------- Co-authored-by: Elastic Machine --- .../plugins/task_manager/server/config.test.ts | 12 +++++++++++- x-pack/plugins/task_manager/server/config.ts | 17 +++++++++++++---- .../test_suites/task_manager/health_route.ts | 2 +- .../test_suites/task_manager/metrics_route.ts | 18 ++++++++++++------ 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 81e9e24ea4586..0268d61c9b975 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { configSchema } from './config'; +import { configSchema, CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from './config'; describe('config validation', () => { test('task manager defaults', () => { @@ -242,4 +242,14 @@ describe('config validation', () => { test('any claim strategy is valid', () => { configSchema.validate({ claim_strategy: 'anything!' }); }); + + test('default claim strategy defaults poll interval to 3000ms', () => { + const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_DEFAULT }); + expect(result.poll_interval).toEqual(3000); + }); + + test('mget claim strategy defaults poll interval to 500ms', () => { + const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_MGET }); + expect(result.poll_interval).toEqual(500); + }); }); diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index f0f4031a4c8ac..3e5da0df9be99 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -13,6 +13,7 @@ export const MAX_CAPACITY = 50; export const MIN_CAPACITY = 5; export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; +export const MGET_DEFAULT_POLL_INTERVAL = 500; export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80; export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT; @@ -133,10 +134,18 @@ export const configSchema = schema.object( default: taskExecutionFailureThresholdSchema, }), /* How often, in milliseconds, the task manager will look for more work. */ - poll_interval: schema.number({ - defaultValue: DEFAULT_POLL_INTERVAL, - min: 100, - }), + poll_interval: schema.conditional( + schema.siblingRef('claim_strategy'), + CLAIM_STRATEGY_MGET, + schema.number({ + defaultValue: MGET_DEFAULT_POLL_INTERVAL, + min: 100, + }), + schema.number({ + defaultValue: DEFAULT_POLL_INTERVAL, + min: 100, + }) + ), /* How many requests can Task Manager buffer before it rejects new requests. */ request_capacity: schema.number({ // a nice round contrived number, feel free to change as we learn how it behaves diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts index aa4f68e1fedd8..7b536025715f0 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/health_route.ts @@ -129,7 +129,7 @@ export default function ({ getService }: FtrProviderContext) { const health = await getHealth(); expect(health.status).to.eql('OK'); expect(health.stats.configuration.value).to.eql({ - poll_interval: 3000, + poll_interval: 500, monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/metrics_route.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/metrics_route.ts index 4fad194abf368..3e37c488e5190 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/metrics_route.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/metrics_route.ts @@ -52,7 +52,10 @@ export default function ({ getService }: FtrProviderContext) { // counters are reset every 30 seconds, so wait until the start of a // fresh counter cycle to make sure values are incrementing const initialMetrics = ( - await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1) + await getMetrics( + false, + (metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= 1 + ) ).metrics; expect(initialMetrics).not.to.be(null); expect(initialMetrics?.task_claim).not.to.be(null); @@ -92,7 +95,7 @@ export default function ({ getService }: FtrProviderContext) { const initialMetrics = ( await getMetrics( false, - (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + (metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue ) ).metrics; expect(initialMetrics).not.to.be(null); @@ -101,7 +104,10 @@ export default function ({ getService }: FtrProviderContext) { // retry until counter value resets const resetMetrics = ( - await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1) + await getMetrics( + false, + (m: NodeMetrics) => (m?.metrics?.task_claim?.value.total || 0) >= 1 + ) ).metrics; expect(resetMetrics).not.to.be(null); expect(resetMetrics?.task_claim).not.to.be(null); @@ -113,7 +119,7 @@ export default function ({ getService }: FtrProviderContext) { const initialMetrics = ( await getMetrics( false, - (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + (metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue ) ).metrics; expect(initialMetrics).not.to.be(null); @@ -133,8 +139,8 @@ export default function ({ getService }: FtrProviderContext) { expect(metrics?.task_claim).not.to.be(null); expect(metrics?.task_claim?.value).not.to.be(null); - expect(metrics?.task_claim?.value.success).to.equal(1); - expect(metrics?.task_claim?.value.total).to.equal(1); + expect(metrics?.task_claim?.value.success).to.be.greaterThan(0); + expect(metrics?.task_claim?.value.total).to.be.greaterThan(0); previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;