-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ingest Manager] Adding bulk packages upgrade api #77827
Changes from 1 commit
76df74d
6dea606
268c526
04a39bd
f111406
2846b76
20357fd
9b29802
3d2a686
7cb80e9
ef51825
6ef14cc
df32308
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,6 +71,26 @@ export interface InstallPackageResponse { | |
response: AssetReference[]; | ||
} | ||
|
||
export interface UpgradePackageError { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name reads like it's an actual Error. Mind prefixing with an |
||
name: string; | ||
statusCode: number; | ||
body: { | ||
message: string; | ||
}; | ||
} | ||
|
||
export interface UpgradePackageInfo { | ||
name: string; | ||
newVersion: string; | ||
// this will be null if no package was present before the upgrade (aka it was an install) | ||
oldVersion: string | null; | ||
assets: AssetReference[]; | ||
} | ||
|
||
export interface UpgradePackagesResponse { | ||
response: Array<UpgradePackageInfo | UpgradePackageError>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have similar patterns elsewhere. Take a look at https://github.com/elastic/kibana/pull/77690/files#diff-f1874bde49bc5bca72d42584b63a5cf3R143-R147 & https://github.com/elastic/kibana/blob/71b9dedfc4eadd97db2b6b0dc038d0f640babf7b/x-pack/plugins/ingest_manager/common/types/rest_spec/package_policy.ts#L53-L57 I like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it makes more sense to have the response of the bulk endpoint be something like:
If there's an error then,
|
||
} | ||
|
||
export interface MessageResponse { | ||
response: string; | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -56,10 +56,7 @@ const getHTTPResponseCode = (error: IngestManagerError): number => { | |||||||||||||||||||||||
return 400; // Bad Request | ||||||||||||||||||||||||
}; | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
export const defaultIngestErrorHandler: IngestErrorHandler = async ({ | ||||||||||||||||||||||||
error, | ||||||||||||||||||||||||
response, | ||||||||||||||||||||||||
}: IngestErrorHandlerParams): Promise<IKibanaResponse> => { | ||||||||||||||||||||||||
export function handleIngestError(error: IngestManagerError | Boom | Error) { | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extracted this out to its own function so the upgrade api can use it to create an array of errors if an error occurs while upgrading a package. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the separate function. The only reason it doesn't exist yet is I couldn't come up with a name for the interface it returns. We can keep thinking and update later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, I'm OK with this as submitted. We can change now or later. Some things I was thinking about:
This function (or this type of function) maps an error to a status code (number) or an interface that contains a status code an maybe some other info. I've yet to figure out what to call it, and was leaning towards a shape like you have here but delayed worrying about it until it came up again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, maybe we could use kibana/src/core/server/http/router/response.ts Lines 83 to 93 in feceb0f
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok, so just to make sure I'm understanding correctly, changing the function signature to this:
Is what you're suggesting? |
||||||||||||||||||||||||
const logger = appContextService.getLogger(); | ||||||||||||||||||||||||
if (isLegacyESClientError(error)) { | ||||||||||||||||||||||||
// there was a problem communicating with ES (e.g. via `callCluster`) | ||||||||||||||||||||||||
|
@@ -72,36 +69,43 @@ export const defaultIngestErrorHandler: IngestErrorHandler = async ({ | |||||||||||||||||||||||
|
||||||||||||||||||||||||
logger.error(message); | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
return response.customError({ | ||||||||||||||||||||||||
return { | ||||||||||||||||||||||||
statusCode: error?.statusCode || error.status, | ||||||||||||||||||||||||
body: { message }, | ||||||||||||||||||||||||
}); | ||||||||||||||||||||||||
}; | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
// our "expected" errors | ||||||||||||||||||||||||
if (error instanceof IngestManagerError) { | ||||||||||||||||||||||||
// only log the message | ||||||||||||||||||||||||
logger.error(error.message); | ||||||||||||||||||||||||
return response.customError({ | ||||||||||||||||||||||||
return { | ||||||||||||||||||||||||
statusCode: getHTTPResponseCode(error), | ||||||||||||||||||||||||
body: { message: error.message }, | ||||||||||||||||||||||||
}); | ||||||||||||||||||||||||
}; | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
// handle any older Boom-based errors or the few places our app uses them | ||||||||||||||||||||||||
if (isBoom(error)) { | ||||||||||||||||||||||||
// only log the message | ||||||||||||||||||||||||
logger.error(error.output.payload.message); | ||||||||||||||||||||||||
return response.customError({ | ||||||||||||||||||||||||
return { | ||||||||||||||||||||||||
statusCode: error.output.statusCode, | ||||||||||||||||||||||||
body: { message: error.output.payload.message }, | ||||||||||||||||||||||||
}); | ||||||||||||||||||||||||
}; | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
// not sure what type of error this is. log as much as possible | ||||||||||||||||||||||||
logger.error(error); | ||||||||||||||||||||||||
return response.customError({ | ||||||||||||||||||||||||
return { | ||||||||||||||||||||||||
statusCode: 500, | ||||||||||||||||||||||||
body: { message: error.message }, | ||||||||||||||||||||||||
}); | ||||||||||||||||||||||||
}; | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
export const defaultIngestErrorHandler: IngestErrorHandler = async ({ | ||||||||||||||||||||||||
jonathan-buttner marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||
error, | ||||||||||||||||||||||||
response, | ||||||||||||||||||||||||
}: IngestErrorHandlerParams): Promise<IKibanaResponse> => { | ||||||||||||||||||||||||
return response.customError(handleIngestError(error)); | ||||||||||||||||||||||||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ | |
*/ | ||
import { TypeOf } from '@kbn/config-schema'; | ||
import { RequestHandler, CustomHttpResponseOptions } from 'src/core/server'; | ||
import { appContextService } from '../../services'; | ||
import { | ||
GetInfoResponse, | ||
InstallPackageResponse, | ||
|
@@ -14,6 +13,7 @@ import { | |
GetCategoriesResponse, | ||
GetPackagesResponse, | ||
GetLimitedPackagesResponse, | ||
UpgradePackagesResponse, | ||
} from '../../../common'; | ||
import { | ||
GetCategoriesRequestSchema, | ||
|
@@ -23,6 +23,7 @@ import { | |
InstallPackageFromRegistryRequestSchema, | ||
InstallPackageByUploadRequestSchema, | ||
DeletePackageRequestSchema, | ||
BulkUpgradePackagesFromRegistryRequestSchema, | ||
} from '../../types'; | ||
import { | ||
getCategories, | ||
|
@@ -34,9 +35,9 @@ import { | |
getLimitedPackages, | ||
getInstallationObject, | ||
} from '../../services/epm/packages'; | ||
import { IngestManagerError, defaultIngestErrorHandler } from '../../errors'; | ||
import { defaultIngestErrorHandler } from '../../errors'; | ||
import { splitPkgKey } from '../../services/epm/registry'; | ||
import { getInstallType } from '../../services/epm/packages/install'; | ||
import { handleInstallPackageFailure, upgradePackages } from '../../services/epm/packages/install'; | ||
|
||
export const getCategoriesHandler: RequestHandler< | ||
undefined, | ||
|
@@ -136,13 +137,11 @@ export const installPackageFromRegistryHandler: RequestHandler< | |
undefined, | ||
TypeOf<typeof InstallPackageFromRegistryRequestSchema.body> | ||
> = async (context, request, response) => { | ||
const logger = appContextService.getLogger(); | ||
const savedObjectsClient = context.core.savedObjects.client; | ||
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser; | ||
const { pkgkey } = request.params; | ||
const { pkgName, pkgVersion } = splitPkgKey(pkgkey); | ||
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName }); | ||
const installType = getInstallType({ pkgVersion, installedPkg }); | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
const res = await installPackage({ | ||
savedObjectsClient, | ||
|
@@ -155,36 +154,38 @@ export const installPackageFromRegistryHandler: RequestHandler< | |
}; | ||
return response.ok({ body }); | ||
} catch (e) { | ||
// could have also done `return defaultIngestErrorHandler({ error: e, response })` at each of the returns, | ||
// but doing it this way will log the outer/install errors before any inner/rollback errors | ||
const defaultResult = await defaultIngestErrorHandler({ error: e, response }); | ||
if (e instanceof IngestManagerError) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extracted this into a function that is shared between the |
||
return defaultResult; | ||
} | ||
await handleInstallPackageFailure( | ||
savedObjectsClient, | ||
e, | ||
pkgName, | ||
pkgVersion, | ||
installedPkg, | ||
callCluster | ||
); | ||
|
||
// if there is an unknown server error, uninstall any package assets or reinstall the previous version if update | ||
try { | ||
if (installType === 'install' || installType === 'reinstall') { | ||
logger.error(`uninstalling ${pkgkey} after error installing`); | ||
await removeInstallation({ savedObjectsClient, pkgkey, callCluster }); | ||
} | ||
if (installType === 'update') { | ||
// @ts-ignore getInstallType ensures we have installedPkg | ||
const prevVersion = `${pkgName}-${installedPkg.attributes.version}`; | ||
logger.error(`rolling back to ${prevVersion} after error installing ${pkgkey}`); | ||
await installPackage({ | ||
savedObjectsClient, | ||
pkgkey: prevVersion, | ||
callCluster, | ||
}); | ||
} | ||
} catch (error) { | ||
logger.error(`failed to uninstall or rollback package after installation error ${error}`); | ||
} | ||
return defaultResult; | ||
} | ||
}; | ||
|
||
export const upgradePackagesFromRegistryHandler: RequestHandler< | ||
undefined, | ||
undefined, | ||
TypeOf<typeof BulkUpgradePackagesFromRegistryRequestSchema.body> | ||
> = async (context, request, response) => { | ||
const savedObjectsClient = context.core.savedObjects.client; | ||
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser; | ||
const res = await upgradePackages({ | ||
savedObjectsClient, | ||
callCluster, | ||
packagesToUpgrade: request.body.upgrade, | ||
}); | ||
const body: UpgradePackagesResponse = { | ||
response: res, | ||
}; | ||
return response.ok({ body }); | ||
}; | ||
|
||
export const installPackageByUploadHandler: RequestHandler< | ||
undefined, | ||
undefined, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ | |
|
||
import { SavedObject, SavedObjectsClientContract } from 'src/core/server'; | ||
import semver from 'semver'; | ||
import Boom from 'boom'; | ||
import { UpgradePackageError, UpgradePackageInfo } from '../../../../common'; | ||
import { PACKAGES_SAVED_OBJECT_TYPE, MAX_TIME_COMPLETE_INSTALL } from '../../../constants'; | ||
import { | ||
AssetReference, | ||
|
@@ -17,6 +19,7 @@ import { | |
EsAssetReference, | ||
ElasticsearchAssetType, | ||
InstallType, | ||
RegistrySearchResult, | ||
} from '../../../types'; | ||
import { installIndexPatterns } from '../kibana/index_pattern/install'; | ||
import * as Registry from '../registry'; | ||
|
@@ -32,11 +35,12 @@ import { | |
ArchiveAsset, | ||
} from '../kibana/assets/install'; | ||
import { updateCurrentWriteIndices } from '../elasticsearch/template/template'; | ||
import { deleteKibanaSavedObjectsAssets } from './remove'; | ||
import { PackageOutdatedError } from '../../../errors'; | ||
import { deleteKibanaSavedObjectsAssets, removeInstallation } from './remove'; | ||
import { IngestManagerError, PackageOutdatedError } from '../../../errors'; | ||
import { getPackageSavedObjects } from './get'; | ||
import { installTransformForDataset } from '../elasticsearch/transform/install'; | ||
import { appContextService } from '../../app_context'; | ||
import { handleIngestError } from '../../../errors/handlers'; | ||
|
||
export async function installLatestPackage(options: { | ||
savedObjectsClient: SavedObjectsClientContract; | ||
|
@@ -95,6 +99,117 @@ export async function ensureInstalledPackage(options: { | |
return installation; | ||
} | ||
|
||
export async function handleInstallPackageFailure( | ||
jonathan-buttner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
savedObjectsClient: SavedObjectsClientContract, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please convert this to an object. 6 parameters is about 2x my max. |
||
error: IngestManagerError | Boom | Error, | ||
pkgName: string, | ||
pkgVersion: string, | ||
installedPkg: SavedObject<Installation> | undefined, | ||
callCluster: CallESAsCurrentUser | ||
) { | ||
if (error instanceof IngestManagerError) { | ||
return; | ||
} | ||
const logger = appContextService.getLogger(); | ||
const pkgkey = Registry.pkgToPkgKey({ | ||
name: pkgName, | ||
version: pkgVersion, | ||
}); | ||
|
||
// if there is an unknown server error, uninstall any package assets or reinstall the previous version if update | ||
try { | ||
const installType = getInstallType({ pkgVersion, installedPkg }); | ||
if (installType === 'install' || installType === 'reinstall') { | ||
logger.error(`uninstalling ${pkgkey} after error installing`); | ||
await removeInstallation({ savedObjectsClient, pkgkey, callCluster }); | ||
} | ||
|
||
if (installType === 'update') { | ||
if (!installedPkg) { | ||
logger.error( | ||
`failed to rollback package after installation error ${error} because saved object was undefined` | ||
); | ||
return; | ||
} | ||
const prevVersion = `${pkgName}-${installedPkg.attributes.version}`; | ||
logger.error(`rolling back to ${prevVersion} after error installing ${pkgkey}`); | ||
await installPackage({ | ||
savedObjectsClient, | ||
pkgkey: prevVersion, | ||
callCluster, | ||
}); | ||
} | ||
} catch (e) { | ||
logger.error(`failed to uninstall or rollback package after installation error ${e}`); | ||
} | ||
} | ||
|
||
export async function upgradePackages({ | ||
savedObjectsClient, | ||
packagesToUpgrade, | ||
callCluster, | ||
}: { | ||
savedObjectsClient: SavedObjectsClientContract; | ||
packagesToUpgrade: string[]; | ||
callCluster: CallESAsCurrentUser; | ||
}): Promise<Array<UpgradePackageInfo | UpgradePackageError>> { | ||
const res: Array<UpgradePackageInfo | UpgradePackageError> = []; | ||
|
||
for (const pkgToUpgrade of packagesToUpgrade) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't care want to stop on the first failure, I think we can use Promise.allSettled https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/allSettled and loop the results and use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. John and I talked about this and I think we're going to leave it as is for now because the code I came up with using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After we spoke, I took a pass at it #77974 As I said on the call, I'm happy to ship existing version but that PR is essentially what I was describing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks John! I'll pull in your changes. |
||
let installedPkg: SavedObject<Installation> | undefined; | ||
let latestPackage: RegistrySearchResult | undefined; | ||
try { | ||
[installedPkg, latestPackage] = await Promise.all([ | ||
getInstallationObject({ savedObjectsClient, pkgName: pkgToUpgrade }), | ||
Registry.fetchFindLatestPackage(pkgToUpgrade), | ||
]); | ||
} catch (e) { | ||
res.push({ name: pkgToUpgrade, ...handleIngestError(e) }); | ||
continue; | ||
} | ||
|
||
// TODO I think we want version here and not `install_version`? | ||
if (!installedPkg || semver.gt(latestPackage.version, installedPkg.attributes.version)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on wether this is the right version I'm using here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, version is the currently installed version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
const pkgkey = Registry.pkgToPkgKey({ | ||
name: latestPackage.name, | ||
version: latestPackage.version, | ||
}); | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment re: Promise.allSettled |
||
const assets = await installPackage({ savedObjectsClient, pkgkey, callCluster }); | ||
res.push({ | ||
name: pkgToUpgrade, | ||
newVersion: latestPackage.version, | ||
oldVersion: installedPkg?.attributes.version ?? null, | ||
assets, | ||
}); | ||
} catch (e) { | ||
res.push({ name: pkgToUpgrade, ...handleIngestError(e) }); | ||
await handleInstallPackageFailure( | ||
savedObjectsClient, | ||
e, | ||
latestPackage.name, | ||
latestPackage.version, | ||
installedPkg, | ||
callCluster | ||
); | ||
} | ||
} else { | ||
// package was already at the latest version | ||
res.push({ | ||
name: pkgToUpgrade, | ||
newVersion: latestPackage.version, | ||
oldVersion: latestPackage.version, | ||
assets: [ | ||
...installedPkg.attributes.installed_es, | ||
...installedPkg.attributes.installed_kibana, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case where no upgrade actually occurs because it was already at the latest version, should we return the current assets? I think this is all the installed assets? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. Currently we reinstall the package if its already installed, but this functionality would not, so it would be a bit inconsistent, though I'm not sure we should be reinstalling anyway. I think for now its fine to just return the assets. |
||
], | ||
}); | ||
} | ||
} | ||
return res; | ||
} | ||
|
||
export async function installPackage({ | ||
savedObjectsClient, | ||
pkgkey, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping to use
POST /packages
but that is used by the local package upload install API.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
packages/_bulk
like ES also occurred to me but I haven't looked into it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just put up a bulk actions PR #77690 where I added
/bulk_reassign
and/bulk_unenroll
routes. does it make sense to agree on a convention here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was gonna ask the same thing. Seeing yours makes me say keep bulk_*.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback so are we thinking
/bulk_packages
then? Or should I go with/packages/_bulk
if that works?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo this points to the need to add a verb to this route. is my understanding correct that this route is only for upgrading existing packages, rather than being able to bulk install packages? if so, I think it makes sense to differentiate this from the install routes (
/packages
). what do you all think of using/packages/bulk_upgrade
?:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jen-huang It can do both upgrades and installs. If the package hasn't already been installed, it'll just install the latest one available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case I'm happy to leave the route as
/packages_bulk
, or simply switching tobulk_packages
to follow more closely with the bulk actions API convention. although/_bulk
is an ES convention, I've never felt that Kibana APIs need to follow ES patternsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jen-huang @neptunian @jfsiii I'm happy to switch it to
bulk_packages
unless someone objects?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too fussed but I don't think
bulk_packages
really follows the convention because those are usingbulk_(verb)
likebulk_upgrade
. Since we don't have the verb, it seems redundant to saypackages/bulk_packages
.