Skip to content

Commit

Permalink
[Ingest] fix transform path to work with external ES/cloud (#77640)
Browse files Browse the repository at this point in the history
[Ingest] fix transform path to work with external ES/cloud
  • Loading branch information
nnamdifrankie authored Sep 17, 2020
1 parent 9d1ba4d commit 3e3f9c7
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsClientContract } from 'kibana/server';
import { Logger, SavedObjectsClientContract } from 'kibana/server';

import { saveInstalledEsRefs } from '../../packages/install';
import * as Registry from '../../registry';
Expand Down Expand Up @@ -33,89 +33,100 @@ export const installTransformForDataset = async (
registryPackage: RegistryPackage,
paths: string[],
callCluster: CallESAsCurrentUser,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name });
let previousInstalledTransformEsAssets: EsAssetReference[] = [];
if (installation) {
previousInstalledTransformEsAssets = installation.installed_es.filter(
({ type, id }) => type === ElasticsearchAssetType.transform
try {
const installation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});
let previousInstalledTransformEsAssets: EsAssetReference[] = [];
if (installation) {
previousInstalledTransformEsAssets = installation.installed_es.filter(
({ type, id }) => type === ElasticsearchAssetType.transform
);
}

// delete all previous transform
await deleteTransforms(
callCluster,
previousInstalledTransformEsAssets.map((asset) => asset.id)
);
}

// delete all previous transform
await deleteTransforms(
callCluster,
previousInstalledTransformEsAssets.map((asset) => asset.id)
);
// install the latest dataset
const datasets = registryPackage.datasets;
if (!datasets?.length) return [];
const installNameSuffix = `${registryPackage.version}`;

const transformPaths = paths.filter((path) => isTransform(path));
let installedTransforms: EsAssetReference[] = [];
if (transformPaths.length > 0) {
const transformPathDatasets = datasets.reduce<TransformPathDataset[]>((acc, dataset) => {
transformPaths.forEach((path) => {
if (isDatasetTransform(path, dataset.path)) {
acc.push({ path, dataset });
}
});
return acc;
}, []);

const transformRefs = transformPathDatasets.reduce<EsAssetReference[]>(
(acc, transformPathDataset) => {
if (transformPathDataset) {
acc.push({
id: getTransformNameForInstallation(transformPathDataset, installNameSuffix),
type: ElasticsearchAssetType.transform,
});
}
// install the latest dataset
const datasets = registryPackage.datasets;
if (!datasets?.length) return [];
const installNameSuffix = `${registryPackage.version}`;

const transformPaths = paths.filter((path) => isTransform(path));
let installedTransforms: EsAssetReference[] = [];
if (transformPaths.length > 0) {
const transformPathDatasets = datasets.reduce<TransformPathDataset[]>((acc, dataset) => {
transformPaths.forEach((path) => {
if (isDatasetTransform(path, dataset.path)) {
acc.push({ path, dataset });
}
});
return acc;
},
[]
);

// get and save transform refs before installing transforms
await saveInstalledEsRefs(savedObjectsClient, registryPackage.name, transformRefs);

const transforms: TransformInstallation[] = transformPathDatasets.map(
(transformPathDataset: TransformPathDataset) => {
return {
installationName: getTransformNameForInstallation(
transformPathDataset,
installNameSuffix
),
content: getAsset(transformPathDataset.path).toString('utf-8'),
};
}
);
}, []);

const transformRefs = transformPathDatasets.reduce<EsAssetReference[]>(
(acc, transformPathDataset) => {
if (transformPathDataset) {
acc.push({
id: getTransformNameForInstallation(transformPathDataset, installNameSuffix),
type: ElasticsearchAssetType.transform,
});
}
return acc;
},
[]
);

// get and save transform refs before installing transforms
await saveInstalledEsRefs(savedObjectsClient, registryPackage.name, transformRefs);

const transforms: TransformInstallation[] = transformPathDatasets.map(
(transformPathDataset: TransformPathDataset) => {
return {
installationName: getTransformNameForInstallation(
transformPathDataset,
installNameSuffix
),
content: getAsset(transformPathDataset.path).toString('utf-8'),
};
}
);

const installationPromises = transforms.map(async (transform) => {
return installTransform({ callCluster, transform });
});
const installationPromises = transforms.map(async (transform) => {
return installTransform({ callCluster, transform, logger });
});

installedTransforms = await Promise.all(installationPromises).then((results) => results.flat());
}
installedTransforms = await Promise.all(installationPromises).then((results) =>
results.flat()
);
}

if (previousInstalledTransformEsAssets.length > 0) {
const currentInstallation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});
if (previousInstalledTransformEsAssets.length > 0) {
const currentInstallation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});

// remove the saved object reference
await deleteTransformRefs(
savedObjectsClient,
currentInstallation?.installed_es || [],
registryPackage.name,
previousInstalledTransformEsAssets.map((asset) => asset.id),
installedTransforms.map((installed) => installed.id)
);
// remove the saved object reference
await deleteTransformRefs(
savedObjectsClient,
currentInstallation?.installed_es || [],
registryPackage.name,
previousInstalledTransformEsAssets.map((asset) => asset.id),
installedTransforms.map((installed) => installed.id)
);
}
return installedTransforms;
} catch (err) {
logger.error(err);
throw err;
}
return installedTransforms;
};

const isTransform = (path: string) => {
Expand All @@ -136,24 +147,31 @@ const isDatasetTransform = (path: string, datasetName: string) => {
async function installTransform({
callCluster,
transform,
logger,
}: {
callCluster: CallESAsCurrentUser;
transform: TransformInstallation;
logger: Logger;
}): Promise<EsAssetReference> {
// defer validation on put if the source index is not available
await callCluster('transport.request', {
method: 'PUT',
path: `_transform/${transform.installationName}`,
query: 'defer_validation=true',
body: transform.content,
});

await callCluster('transport.request', {
method: 'POST',
path: `_transform/${transform.installationName}/_start`,
});

return { id: transform.installationName, type: ElasticsearchAssetType.transform };
try {
// defer validation on put if the source index is not available
await callCluster('transport.request', {
method: 'PUT',
path: `/_transform/${transform.installationName}`,
query: 'defer_validation=true',
body: transform.content,
});

await callCluster('transport.request', {
method: 'POST',
path: `/_transform/${transform.installationName}/_start`,
});

return { id: transform.installationName, type: ElasticsearchAssetType.transform };
} catch (err) {
logger.error(err);
throw err;
}
}

const getTransformNameForInstallation = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const stopTransforms = async (transformIds: string[], callCluster: CallES
for (const transformId of transformIds) {
await callCluster('transport.request', {
method: 'POST',
path: `_transform/${transformId}/_stop`,
path: `/_transform/${transformId}/_stop`,
query: 'force=true',
ignore: [404],
});
Expand All @@ -29,7 +29,7 @@ export const deleteTransforms = async (
await callCluster('transport.request', {
method: 'DELETE',
query: 'force=true',
path: `_transform/${transformId}`,
path: `/_transform/${transformId}`,
ignore: [404],
});
})
Expand Down
Loading

0 comments on commit 3e3f9c7

Please sign in to comment.