Skip to content

Commit

Permalink
[Fleet] Make datastream rollover lazy (#174790) (#176565)
Browse files Browse the repository at this point in the history
## Summary

Add back changes in #174790 after
elastic/elasticsearch#104732 is fixed

Resolve #174480

Co-authored-by: Nicolas Chaulet <nicolas.chaulet@elastic.co>
  • Loading branch information
juliaElastic and nchaulet authored Feb 9, 2024
1 parent bc19b6b commit 240e5ef
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,14 @@ describe('EPM template', () => {
},
]);

expect(esClient.indices.rollover).toHaveBeenCalled();
expect(esClient.transport.request).toHaveBeenCalledWith(
expect.objectContaining({
path: '/test.prefix1-default/_rollover',
querystring: {
lazy: true,
},
})
);
});
it('should skip rollover on expected error when flag is on', async () => {
const esClient = elasticsearchServiceMock.createElasticsearchClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,12 @@ const getDataStreams = async (
const rolloverDataStream = (dataStreamName: string, esClient: ElasticsearchClient) => {
try {
// Do no wrap rollovers in retryTransientEsErrors since it is not idempotent
return esClient.indices.rollover({
alias: dataStreamName,
return esClient.transport.request({
method: 'POST',
path: `/${dataStreamName}/_rollover`,
querystring: {
lazy: true,
},
});
} catch (error) {
throw new PackageESError(
Expand Down
96 changes: 56 additions & 40 deletions x-pack/test/fleet_api_integration/apis/epm/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,43 +41,46 @@ export default function (providerContext: FtrProviderContext) {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);

const writeMetricsDoc = (namespace: string) =>
es.transport.request(
{
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc?refresh=true`,
body: {
'@timestamp': new Date().toISOString(),
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace,
type: 'metrics',
},
},
},
{ meta: true }
);

const writeLogsDoc = (namespace: string) =>
es.transport.request(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc?refresh=true`,
body: {
'@timestamp': new Date().toISOString(),
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
},
},
{ meta: true }
);
beforeEach(async () => {
await installPackage(pkgName, pkgVersion);
await Promise.all(
namespaces.map(async (namespace) => {
const createLogsRequest = es.transport.request(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
},
},
{ meta: true }
);
const createMetricsRequest = es.transport.request(
{
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace,
type: 'metrics',
},
},
},
{ meta: true }
);
return Promise.all([createLogsRequest, createMetricsRequest]);
return Promise.all([writeLogsDoc(namespace), writeMetricsDoc(namespace)]);
})
);
});
Expand Down Expand Up @@ -141,7 +144,11 @@ export default function (providerContext: FtrProviderContext) {

it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgName, pkgUpdateVersion);

await asyncForEach(namespaces, async (namespace) => {
// write doc as rollover is lazy
await writeLogsDoc(namespace);
await writeMetricsDoc(namespace);
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
Expand Down Expand Up @@ -266,6 +273,8 @@ export default function (providerContext: FtrProviderContext) {
})
.expect(200);

// Write a doc to trigger lazy rollover
await writeLogsDoc('default');
// Datastream should have been rolled over
expect(await getLogsDefaultBackingIndicesLength()).to.be(2);
});
Expand Down Expand Up @@ -303,26 +312,29 @@ export default function (providerContext: FtrProviderContext) {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);

beforeEach(async () => {
await installPackage(pkgName, pkgVersion);

// Create a sample document so the data stream is created
await es.transport.request(
const writeMetricDoc = (body: any = {}) =>
es.transport.request(
{
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc`,
path: `/${metricsTemplateName}-${namespace}/_doc?refresh=true`,
body: {
'@timestamp': '2015-01-01',
'@timestamp': new Date().toISOString(),
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
...body,
},
},
{ meta: true }
);
beforeEach(async () => {
await installPackage(pkgName, pkgVersion);

// Create a sample document so the data stream is created
await writeMetricDoc();
});

afterEach(async () => {
Expand All @@ -340,6 +352,10 @@ export default function (providerContext: FtrProviderContext) {
it('rolls over data stream when index_mode: time_series is set in the updated package version', async () => {
await installPackage(pkgName, pkgUpdateVersion);

// Write a doc so lazy rollover can happen
await writeMetricDoc({
some_field: 'test',
});
const resMetricsDatastream = await es.transport.request<any>(
{
method: 'GET',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,59 @@ export default function (providerContext: FtrProviderContext) {
.send({ force: true })
.expect(200);

await es.index({
index: 'metrics-apm.service_summary.10m-default',
document: {
'@timestamp': '2023-05-30T07:50:00.000Z',
agent: {
name: 'go',
},
data_stream: {
dataset: 'apm.service_summary.10m',
namespace: 'default',
type: 'metrics',
},
ecs: {
version: '8.6.0-dev',
},
event: {
agent_id_status: 'missing',
ingested: '2023-05-30T07:57:12Z',
},
metricset: {
interval: '10m',
name: 'service_summary',
},
observer: {
hostname: '047e282994fb',
type: 'apm-server',
version: '8.7.0',
},
processor: {
event: 'metric',
name: 'metric',
},
service: {
language: {
const writeDoc = () =>
es.index({
refresh: true,
index: 'metrics-apm.service_summary.10m-default',
document: {
'@timestamp': '2023-05-30T07:50:00.000Z',
agent: {
name: 'go',
},
name: '___main_elastic_cloud_87_ilm_fix',
data_stream: {
dataset: 'apm.service_summary.10m',
namespace: 'default',
type: 'metrics',
},
ecs: {
version: '8.6.0-dev',
},
event: {
agent_id_status: 'missing',
ingested: '2023-05-30T07:57:12Z',
},
metricset: {
interval: '10m',
name: 'service_summary',
},
observer: {
hostname: '047e282994fb',
type: 'apm-server',
version: '8.7.0',
},
processor: {
event: 'metric',
name: 'metric',
},
service: {
language: {
name: 'go',
},
name: '___main_elastic_cloud_87_ilm_fix',
},
},
},
});
});

await writeDoc();

await supertest
.post(`/api/fleet/epm/packages/apm/8.8.0`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);

// Rollover are lazy need to write a new doc
await writeDoc();
const ds = await es.indices.get({
index: 'metrics-apm.service_summary*',
expand_wildcards: ['open', 'hidden'],
Expand Down

0 comments on commit 240e5ef

Please sign in to comment.