Skip to content

Commit

Permalink
[Ingest Manager] fix removing ingest pipelines from elasticsearch (#7…
Browse files Browse the repository at this point in the history
…5092)

* fix removing ingest pipelines bug

* undo unneeded changes to default.yml entry pipeline
  • Loading branch information
neptunian authored Aug 15, 2020
1 parent 48deb7b commit 46a268f
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

export { installPipelines } from './install';

export { deletePipelines, deletePipeline } from './remove';
export { deletePreviousPipelines, deletePipeline } from './remove';
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,43 @@ import { SavedObjectsClientContract } from 'src/core/server';
import { appContextService } from '../../../';
import { CallESAsCurrentUser, ElasticsearchAssetType } from '../../../../types';
import { getInstallation } from '../../packages/get';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common';
import { PACKAGES_SAVED_OBJECT_TYPE, EsAssetReference } from '../../../../../common';

export const deletePipelines = async (
export const deletePreviousPipelines = async (
callCluster: CallESAsCurrentUser,
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
pkgVersion: string
previousPkgVersion: string
) => {
const logger = appContextService.getLogger();
const previousPipelinesPattern = `*-${pkgName}.*-${pkgVersion}`;

const installation = await getInstallation({ savedObjectsClient, pkgName });
if (!installation) return;
const installedEsAssets = installation.installed_es;
const installedPipelines = installedEsAssets.filter(
({ type, id }) =>
type === ElasticsearchAssetType.ingestPipeline && id.includes(previousPkgVersion)
);
const deletePipelinePromises = installedPipelines.map(({ type, id }) => {
return deletePipeline(callCluster, id);
});
try {
await deletePipeline(callCluster, previousPipelinesPattern);
await Promise.all(deletePipelinePromises);
} catch (e) {
logger.error(e);
}
try {
await deletePipelineRefs(savedObjectsClient, pkgName, pkgVersion);
await deletePipelineRefs(savedObjectsClient, installedEsAssets, pkgName, previousPkgVersion);
} catch (e) {
logger.error(e);
}
};

export const deletePipelineRefs = async (
savedObjectsClient: SavedObjectsClientContract,
installedEsAssets: EsAssetReference[],
pkgName: string,
pkgVersion: string
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName });
if (!installation) return;
const installedEsAssets = installation.installed_es;
const filteredAssets = installedEsAssets.filter(({ type, id }) => {
if (type !== ElasticsearchAssetType.ingestPipeline) return true;
if (!id.includes(pkgVersion)) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as Registry from '../registry';
import { getInstallation, getInstallationObject, isRequiredPackage } from './index';
import { installTemplates } from '../elasticsearch/template/install';
import { generateESIndexPatterns } from '../elasticsearch/template/template';
import { installPipelines, deletePipelines } from '../elasticsearch/ingest_pipeline/';
import { installPipelines, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline/';
import { installILMPolicy } from '../elasticsearch/ilm/install';
import {
installKibanaAssets,
Expand Down Expand Up @@ -183,7 +183,7 @@ export async function installPackage({

// if this is an update, delete the previous version's pipelines
if (installedPkg && !reinstall) {
await deletePipelines(
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ export default function (providerContext: FtrProviderContext) {
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`,
});
expect(res.statusCode).equal(200);
const resPipeline1 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
});
expect(resPipeline1.statusCode).equal(200);
const resPipeline2 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
});
expect(resPipeline2.statusCode).equal(200);
});
it('should have installed the template components', async function () {
const res = await es.transport.request({
Expand Down Expand Up @@ -135,6 +145,14 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs-all_assets.test_logs-0.1.0',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.1.0-pipeline1',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.1.0-pipeline2',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs',
type: 'index_template',
Expand Down Expand Up @@ -195,6 +213,26 @@ export default function (providerContext: FtrProviderContext) {
}
);
expect(res.statusCode).equal(404);
const resPipeline1 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
},
{
ignore: [404],
}
);
expect(resPipeline1.statusCode).equal(404);
const resPipeline2 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
},
{
ignore: [404],
}
);
expect(resPipeline2.statusCode).equal(404);
});
it('should have uninstalled the kibana assets', async function () {
let resDashboard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,49 @@ export default function (providerContext: FtrProviderContext) {
},
});
});
it('should have installed the new versionized pipeline', async function () {
it('should have installed the new versionized pipelines', async function () {
const res = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}`,
});
expect(res.statusCode).equal(200);
const resPipeline1 = await es.transport.request({
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgUpdateVersion}-pipeline1`,
});
expect(resPipeline1.statusCode).equal(200);
});
it('should have removed the old versionized pipelines', async function () {
let res;
try {
res = await es.transport.request({
const res = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}`,
});
} catch (err) {
res = err;
}
},
{
ignore: [404],
}
);
expect(res.statusCode).equal(404);
const resPipeline1 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline1`,
},
{
ignore: [404],
}
);
expect(resPipeline1.statusCode).equal(404);
const resPipeline2 = await es.transport.request(
{
method: 'GET',
path: `/_ingest/pipeline/${logsTemplateName}-${pkgVersion}-pipeline2`,
},
{
ignore: [404],
}
);
expect(resPipeline2.statusCode).equal(404);
});
it('should have updated the template components', async function () {
const res = await es.transport.request({
Expand Down Expand Up @@ -272,6 +297,10 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs-all_assets.test_logs-0.2.0',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs-0.2.0-pipeline1',
type: 'ingest_pipeline',
},
{
id: 'logs-all_assets.test_logs',
type: 'index_template',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
description: Pipeline test
processors:
- remove:
field: messag
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"

0 comments on commit 46a268f

Please sign in to comment.