From 240e5ef10c6f5763724510b727ce07c82fe15498 Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Fri, 9 Feb 2024 10:39:57 +0100 Subject: [PATCH] [Fleet] Make datastream rollover lazy (#174790) (#176565) ## Summary Add back changes in https://github.com/elastic/kibana/pull/174790 after https://github.com/elastic/elasticsearch/issues/104732 is fixed Resolve https://github.com/elastic/kibana/issues/174480 Co-authored-by: Nicolas Chaulet --- .../elasticsearch/template/template.test.ts | 9 +- .../epm/elasticsearch/template/template.ts | 8 +- .../apis/epm/data_stream.ts | 96 +++++++++++-------- .../apis/epm/install_hidden_datastreams.ts | 80 +++++++++------- 4 files changed, 113 insertions(+), 80 deletions(-) diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.test.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.test.ts index f680a0bf004a6..58bcfcca386cf 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.test.ts @@ -1673,7 +1673,14 @@ describe('EPM template', () => { }, ]); - expect(esClient.indices.rollover).toHaveBeenCalled(); + expect(esClient.transport.request).toHaveBeenCalledWith( + expect.objectContaining({ + path: '/test.prefix1-default/_rollover', + querystring: { + lazy: true, + }, + }) + ); }); it('should skip rollover on expected error when flag is on', async () => { const esClient = elasticsearchServiceMock.createElasticsearchClient(); diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts index da2b801548e18..01b1792dc5e79 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts @@ -946,8 +946,12 @@ const getDataStreams = async ( const rolloverDataStream = (dataStreamName: string, esClient: ElasticsearchClient) => { try { // Do no wrap rollovers in retryTransientEsErrors since it is not idempotent - return esClient.indices.rollover({ - alias: dataStreamName, + return esClient.transport.request({ + method: 'POST', + path: `/${dataStreamName}/_rollover`, + querystring: { + lazy: true, + }, }); } catch (error) { throw new PackageESError( diff --git a/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts b/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts index 06a67a13e425c..a257ff97933d9 100644 --- a/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts +++ b/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts @@ -41,43 +41,46 @@ export default function (providerContext: FtrProviderContext) { skipIfNoDockerRegistry(providerContext); setupFleetAndAgents(providerContext); + const writeMetricsDoc = (namespace: string) => + es.transport.request( + { + method: 'POST', + path: `/${metricsTemplateName}-${namespace}/_doc?refresh=true`, + body: { + '@timestamp': new Date().toISOString(), + logs_test_name: 'test', + data_stream: { + dataset: `${pkgName}.test_metrics`, + namespace, + type: 'metrics', + }, + }, + }, + { meta: true } + ); + + const writeLogsDoc = (namespace: string) => + es.transport.request( + { + method: 'POST', + path: `/${logsTemplateName}-${namespace}/_doc?refresh=true`, + body: { + '@timestamp': new Date().toISOString(), + logs_test_name: 'test', + data_stream: { + dataset: `${pkgName}.test_logs`, + namespace, + type: 'logs', + }, + }, + }, + { meta: true } + ); beforeEach(async () => { await installPackage(pkgName, pkgVersion); await Promise.all( namespaces.map(async (namespace) => { - const createLogsRequest = es.transport.request( - { - method: 'POST', - path: `/${logsTemplateName}-${namespace}/_doc`, - body: { - '@timestamp': '2015-01-01', - logs_test_name: 'test', - data_stream: { - dataset: `${pkgName}.test_logs`, - namespace, - type: 'logs', - }, - }, - }, - { meta: true } - ); - const createMetricsRequest = es.transport.request( - { - method: 'POST', - path: `/${metricsTemplateName}-${namespace}/_doc`, - body: { - '@timestamp': '2015-01-01', - logs_test_name: 'test', - data_stream: { - dataset: `${pkgName}.test_metrics`, - namespace, - type: 'metrics', - }, - }, - }, - { meta: true } - ); - return Promise.all([createLogsRequest, createMetricsRequest]); + return Promise.all([writeLogsDoc(namespace), writeMetricsDoc(namespace)]); }) ); }); @@ -141,7 +144,11 @@ export default function (providerContext: FtrProviderContext) { it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () { await installPackage(pkgName, pkgUpdateVersion); + await asyncForEach(namespaces, async (namespace) => { + // write doc as rollover is lazy + await writeLogsDoc(namespace); + await writeMetricsDoc(namespace); const resLogsDatastream = await es.transport.request( { method: 'GET', @@ -266,6 +273,8 @@ export default function (providerContext: FtrProviderContext) { }) .expect(200); + // Write a doc to trigger lazy rollover + await writeLogsDoc('default'); // Datastream should have been rolled over expect(await getLogsDefaultBackingIndicesLength()).to.be(2); }); @@ -303,26 +312,29 @@ export default function (providerContext: FtrProviderContext) { skipIfNoDockerRegistry(providerContext); setupFleetAndAgents(providerContext); - beforeEach(async () => { - await installPackage(pkgName, pkgVersion); - - // Create a sample document so the data stream is created - await es.transport.request( + const writeMetricDoc = (body: any = {}) => + es.transport.request( { method: 'POST', - path: `/${metricsTemplateName}-${namespace}/_doc`, + path: `/${metricsTemplateName}-${namespace}/_doc?refresh=true`, body: { - '@timestamp': '2015-01-01', + '@timestamp': new Date().toISOString(), logs_test_name: 'test', data_stream: { dataset: `${pkgName}.test_logs`, namespace, type: 'logs', }, + ...body, }, }, { meta: true } ); + beforeEach(async () => { + await installPackage(pkgName, pkgVersion); + + // Create a sample document so the data stream is created + await writeMetricDoc(); }); afterEach(async () => { @@ -340,6 +352,10 @@ export default function (providerContext: FtrProviderContext) { it('rolls over data stream when index_mode: time_series is set in the updated package version', async () => { await installPackage(pkgName, pkgUpdateVersion); + // Write a doc so lazy rollover can happen + await writeMetricDoc({ + some_field: 'test', + }); const resMetricsDatastream = await es.transport.request( { method: 'GET', diff --git a/x-pack/test/fleet_api_integration/apis/epm/install_hidden_datastreams.ts b/x-pack/test/fleet_api_integration/apis/epm/install_hidden_datastreams.ts index 2fe976352944a..2ec6fb92000e3 100644 --- a/x-pack/test/fleet_api_integration/apis/epm/install_hidden_datastreams.ts +++ b/x-pack/test/fleet_api_integration/apis/epm/install_hidden_datastreams.ts @@ -34,46 +34,50 @@ export default function (providerContext: FtrProviderContext) { .send({ force: true }) .expect(200); - await es.index({ - index: 'metrics-apm.service_summary.10m-default', - document: { - '@timestamp': '2023-05-30T07:50:00.000Z', - agent: { - name: 'go', - }, - data_stream: { - dataset: 'apm.service_summary.10m', - namespace: 'default', - type: 'metrics', - }, - ecs: { - version: '8.6.0-dev', - }, - event: { - agent_id_status: 'missing', - ingested: '2023-05-30T07:57:12Z', - }, - metricset: { - interval: '10m', - name: 'service_summary', - }, - observer: { - hostname: '047e282994fb', - type: 'apm-server', - version: '8.7.0', - }, - processor: { - event: 'metric', - name: 'metric', - }, - service: { - language: { + const writeDoc = () => + es.index({ + refresh: true, + index: 'metrics-apm.service_summary.10m-default', + document: { + '@timestamp': '2023-05-30T07:50:00.000Z', + agent: { name: 'go', }, - name: '___main_elastic_cloud_87_ilm_fix', + data_stream: { + dataset: 'apm.service_summary.10m', + namespace: 'default', + type: 'metrics', + }, + ecs: { + version: '8.6.0-dev', + }, + event: { + agent_id_status: 'missing', + ingested: '2023-05-30T07:57:12Z', + }, + metricset: { + interval: '10m', + name: 'service_summary', + }, + observer: { + hostname: '047e282994fb', + type: 'apm-server', + version: '8.7.0', + }, + processor: { + event: 'metric', + name: 'metric', + }, + service: { + language: { + name: 'go', + }, + name: '___main_elastic_cloud_87_ilm_fix', + }, }, - }, - }); + }); + + await writeDoc(); await supertest .post(`/api/fleet/epm/packages/apm/8.8.0`) @@ -81,6 +85,8 @@ export default function (providerContext: FtrProviderContext) { .send({ force: true }) .expect(200); + // Rollover are lazy need to write a new doc + await writeDoc(); const ds = await es.indices.get({ index: 'metrics-apm.service_summary*', expand_wildcards: ['open', 'hidden'],