diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/get_custom_metric_label.ts b/x-pack/plugins/infra/common/formatters/get_custom_metric_label.ts similarity index 91% rename from x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/get_custom_metric_label.ts rename to x-pack/plugins/infra/common/formatters/get_custom_metric_label.ts index 495cc8197d2e7..3be5986d489d3 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/get_custom_metric_label.ts +++ b/x-pack/plugins/infra/common/formatters/get_custom_metric_label.ts @@ -5,7 +5,7 @@ */ import { i18n } from '@kbn/i18n'; -import { SnapshotCustomMetricInput } from '../../../../../../../common/http_api/snapshot_api'; +import { SnapshotCustomMetricInput } from '../http_api/snapshot_api'; export const getCustomMetricLabel = (metric: SnapshotCustomMetricInput) => { const METRIC_LABELS = { diff --git a/x-pack/plugins/infra/common/utils/corrected_percent_convert.test.ts b/x-pack/plugins/infra/common/utils/corrected_percent_convert.test.ts new file mode 100644 index 0000000000000..ab608f331b884 --- /dev/null +++ b/x-pack/plugins/infra/common/utils/corrected_percent_convert.test.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { decimalToPct, pctToDecimal } from './corrected_percent_convert'; + +describe('decimalToPct', () => { + test('should retain correct floating point precision up to 10 decimal places', () => { + // Most of these cases would still work fine just doing x * 100 instead of passing it through + // decimalToPct, but the function still needs to work regardless + expect(decimalToPct(0)).toBe(0); + expect(decimalToPct(0.1)).toBe(10); + expect(decimalToPct(0.01)).toBe(1); + expect(decimalToPct(0.014)).toBe(1.4); + expect(decimalToPct(0.0141)).toBe(1.41); + expect(decimalToPct(0.01414)).toBe(1.414); + // This case is known to fail without decimalToPct; vanilla JS 0.014141 * 100 === 1.4141000000000001 + expect(decimalToPct(0.014141)).toBe(1.4141); + expect(decimalToPct(0.0141414)).toBe(1.41414); + expect(decimalToPct(0.01414141)).toBe(1.414141); + expect(decimalToPct(0.014141414)).toBe(1.4141414); + }); + test('should also work with values greater than 1', () => { + expect(decimalToPct(2)).toBe(200); + expect(decimalToPct(2.1)).toBe(210); + expect(decimalToPct(2.14)).toBe(214); + expect(decimalToPct(2.14141414)).toBe(214.141414); + }); +}); + +describe('pctToDecimal', () => { + test('should retain correct floating point precision up to 10 decimal places', () => { + expect(pctToDecimal(0)).toBe(0); + expect(pctToDecimal(10)).toBe(0.1); + expect(pctToDecimal(1)).toBe(0.01); + expect(pctToDecimal(1.4)).toBe(0.014); + expect(pctToDecimal(1.41)).toBe(0.0141); + expect(pctToDecimal(1.414)).toBe(0.01414); + expect(pctToDecimal(1.4141)).toBe(0.014141); + expect(pctToDecimal(1.41414)).toBe(0.0141414); + expect(pctToDecimal(1.414141)).toBe(0.01414141); + expect(pctToDecimal(1.4141414)).toBe(0.014141414); + }); + test('should also work with values greater than 100%', () => { + expect(pctToDecimal(200)).toBe(2); + expect(pctToDecimal(210)).toBe(2.1); + expect(pctToDecimal(214)).toBe(2.14); + expect(pctToDecimal(214.141414)).toBe(2.14141414); + }); +}); diff --git a/x-pack/plugins/infra/common/utils/corrected_percent_convert.ts b/x-pack/plugins/infra/common/utils/corrected_percent_convert.ts new file mode 100644 index 0000000000000..a8e3db5133cf5 --- /dev/null +++ b/x-pack/plugins/infra/common/utils/corrected_percent_convert.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +const correctedPctConvert = (v: number, decimalToPct: boolean) => { + // Correct floating point precision + const replacementPattern = decimalToPct ? new RegExp(/0?\./) : '.'; + const numberOfDigits = String(v).replace(replacementPattern, '').length; + const multipliedValue = decimalToPct ? v * 100 : v / 100; + return parseFloat(multipliedValue.toPrecision(numberOfDigits)); +}; + +export const decimalToPct = (v: number) => correctedPctConvert(v, true); +export const pctToDecimal = (v: number) => correctedPctConvert(v, false); diff --git a/x-pack/plugins/infra/public/alerting/inventory/components/alert_flyout.tsx b/x-pack/plugins/infra/public/alerting/inventory/components/alert_flyout.tsx index 804ff9602c81c..834afefd74712 100644 --- a/x-pack/plugins/infra/public/alerting/inventory/components/alert_flyout.tsx +++ b/x-pack/plugins/infra/public/alerting/inventory/components/alert_flyout.tsx @@ -12,6 +12,7 @@ import { useKibana } from '../../../../../../../src/plugins/kibana_react/public' import { METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID } from '../../../../server/lib/alerting/inventory_metric_threshold/types'; import { InfraWaffleMapOptions } from '../../../lib/lib'; import { InventoryItemType } from '../../../../common/inventory_models/types'; +import { useAlertPrefillContext } from '../../../alerting/use_alert_prefill'; interface Props { visible?: boolean; @@ -21,16 +22,24 @@ interface Props { setVisible: React.Dispatch>; } -export const AlertFlyout = (props: Props) => { +export const AlertFlyout = ({ options, nodeType, filter, visible, setVisible }: Props) => { const { triggersActionsUI } = useContext(TriggerActionsContext); const { services } = useKibana(); + const { inventoryPrefill } = useAlertPrefillContext(); + const { customMetrics } = inventoryPrefill; + return ( <> {triggersActionsUI && ( { }} > ({ + useSourceViaHttp: () => ({ + source: { id: 'default' }, + createDerivedIndexPattern: () => ({ fields: [], title: 'metricbeat-*' }), + }), +})); + +const exampleCustomMetric = { + id: 'this-is-an-id', + field: 'some.system.field', + aggregation: 'rate', + type: 'custom', +} as SnapshotCustomMetricInput; + +describe('Expression', () => { + async function setup(currentOptions: AlertContextMeta) { + const alertParams = { + criteria: [], + nodeType: undefined, + filterQueryText: '', + }; + + const mocks = coreMock.createSetup(); + const startMocks = coreMock.createStart(); + const [ + { + application: { capabilities }, + }, + ] = await mocks.getStartServices(); + + const context: AlertsContextValue = { + http: mocks.http, + toastNotifications: mocks.notifications.toasts, + actionTypeRegistry: actionTypeRegistryMock.create() as any, + alertTypeRegistry: alertTypeRegistryMock.create() as any, + docLinks: startMocks.docLinks, + capabilities: { + ...capabilities, + actions: { + delete: true, + save: true, + show: true, + }, + }, + metadata: currentOptions, + }; + + const wrapper = mountWithIntl( + Reflect.set(alertParams, key, value)} + setAlertProperty={() => {}} + /> + ); + + const update = async () => + await act(async () => { + await nextTick(); + wrapper.update(); + }); + + await update(); + + return { wrapper, update, alertParams }; + } + + it('should prefill the alert using the context metadata', async () => { + const currentOptions = { + filter: 'foo', + nodeType: 'pod', + customMetrics: [], + options: { metric: { type: 'memory' } }, + }; + const { alertParams } = await setup(currentOptions as AlertContextMeta); + expect(alertParams.nodeType).toBe('pod'); + expect(alertParams.filterQueryText).toBe('foo'); + expect(alertParams.criteria).toEqual([ + { + metric: 'memory', + comparator: Comparator.GT, + threshold: [], + timeSize: 1, + timeUnit: 'm', + }, + ]); + }); + describe('using custom metrics', () => { + it('should prefill the alert using the context metadata', async () => { + const currentOptions = { + filter: '', + nodeType: 'tx', + customMetrics: [exampleCustomMetric], + options: { metric: exampleCustomMetric }, + }; + const { alertParams, update } = await setup(currentOptions as AlertContextMeta); + await update(); + expect(alertParams.nodeType).toBe('tx'); + expect(alertParams.filterQueryText).toBe(''); + expect(alertParams.criteria).toEqual([ + { + metric: 'custom', + comparator: Comparator.GT, + threshold: [], + timeSize: 1, + timeUnit: 'm', + customMetric: exampleCustomMetric, + }, + ]); + }); + }); +}); + +describe('ExpressionRow', () => { + async function setup(expression: InventoryMetricConditions) { + const wrapper = mountWithIntl( + {}} + addExpression={() => {}} + key={1} + expressionId={1} + setAlertParams={() => {}} + errors={{ + aggField: [], + timeSizeUnit: [], + timeWindowSize: [], + metric: [], + }} + expression={expression} + alertsContextMetadata={{ + customMetrics: [], + }} + /> + ); + + const update = async () => + await act(async () => { + await nextTick(); + wrapper.update(); + }); + + await update(); + + return { wrapper, update }; + } + const expression = { + metric: 'custom', + comparator: Comparator.GT, + threshold: [], + timeSize: 1, + timeUnit: 'm', + customMetric: exampleCustomMetric, + }; + + it('loads custom metrics passed in through the expression, even with an empty context', async () => { + const { wrapper } = await setup(expression as InventoryMetricConditions); + const [valueMatch] = + wrapper.html().match('Rate of some.system.field') ?? + []; + expect(valueMatch).toBeTruthy(); + }); +}); diff --git a/x-pack/plugins/infra/public/alerting/inventory/components/expression.tsx b/x-pack/plugins/infra/public/alerting/inventory/components/expression.tsx index 7ca17617871ff..78cabcf354437 100644 --- a/x-pack/plugins/infra/public/alerting/inventory/components/expression.tsx +++ b/x-pack/plugins/infra/public/alerting/inventory/components/expression.tsx @@ -4,7 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ -import { debounce, pick } from 'lodash'; +import { set } from '@elastic/safer-lodash-set'; +import { debounce, pick, uniqBy, isEqual } from 'lodash'; import { Unit } from '@elastic/datemath'; import React, { useCallback, useMemo, useEffect, useState, ChangeEvent } from 'react'; import { @@ -22,6 +23,7 @@ import { } from '@elastic/eui'; import { FormattedMessage } from '@kbn/i18n/react'; import { i18n } from '@kbn/i18n'; +import { getCustomMetricLabel } from '../../../../common/formatters/get_custom_metric_label'; import { toMetricOpt } from '../../../../common/snapshot_metric_i18n'; import { AlertPreview } from '../../common'; import { METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID } from '../../../../common/alerting/metrics'; @@ -49,22 +51,31 @@ import { hostMetricTypes } from '../../../../common/inventory_models/host/toolba import { containerMetricTypes } from '../../../../common/inventory_models/container/toolbar_items'; import { podMetricTypes } from '../../../../common/inventory_models/pod/toolbar_items'; import { findInventoryModel } from '../../../../common/inventory_models'; -import { InventoryItemType, SnapshotMetricType } from '../../../../common/inventory_models/types'; +import { + InventoryItemType, + SnapshotMetricType, + SnapshotMetricTypeRT, +} from '../../../../common/inventory_models/types'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { InventoryMetricConditions } from '../../../../server/lib/alerting/inventory_metric_threshold/types'; import { MetricExpression } from './metric'; import { NodeTypeExpression } from './node_type'; import { InfraWaffleMapOptions } from '../../../lib/lib'; import { convertKueryToElasticSearchQuery } from '../../../utils/kuery'; +import { + SnapshotCustomMetricInput, + SnapshotCustomMetricInputRT, +} from '../../../../common/http_api/snapshot_api'; import { validateMetricThreshold } from './validation'; const FILTER_TYPING_DEBOUNCE_MS = 500; -interface AlertContextMeta { +export interface AlertContextMeta { options?: Partial; nodeType?: InventoryItemType; filter?: string; + customMetrics?: SnapshotCustomMetricInput[]; } interface Props { @@ -89,6 +100,7 @@ const defaultExpression = { threshold: [], timeSize: 1, timeUnit: 'm', + customMetric: undefined, } as InventoryMetricConditions; export const Expressions: React.FC = (props) => { @@ -204,6 +216,9 @@ export const Expressions: React.FC = (props) => { { ...defaultExpression, metric: md.options.metric!.type, + customMetric: SnapshotCustomMetricInputRT.is(md.options.metric) + ? md.options.metric + : undefined, } as InventoryMetricConditions, ]); } else { @@ -282,6 +297,7 @@ export const Expressions: React.FC = (props) => { setAlertParams={updateParams} errors={errors[idx] || emptyError} expression={e || {}} + alertsContextMetadata={alertsContext.metadata} /> ); })} @@ -389,6 +405,7 @@ interface ExpressionRowProps { addExpression(): void; remove(id: number): void; setAlertParams(id: number, params: Partial): void; + alertsContextMetadata: AlertsContextValue['metadata']; } const StyledExpressionRow = euiStyled(EuiFlexGroup)` @@ -402,14 +419,48 @@ const StyledExpression = euiStyled.div` `; export const ExpressionRow: React.FC = (props) => { - const { setAlertParams, expression, errors, expressionId, remove, canDelete } = props; - const { metric, comparator = Comparator.GT, threshold = [] } = expression; + const { + setAlertParams, + expression, + errors, + expressionId, + remove, + canDelete, + alertsContextMetadata, + } = props; + const { metric, comparator = Comparator.GT, threshold = [], customMetric } = expression; + const [customMetrics, updateCustomMetrics] = useState([]); + + // Create and uniquify a list of custom metrics including: + // - The alert metadata context (which only gives us custom metrics on the inventory page) + // - The custom metric stored in the expression (necessary when editing this alert without having + // access to the metadata context) + // - Whatever custom metrics were previously stored in this list (to preserve the custom metric in the dropdown + // if the user edits the alert and switches away from the custom metric) + useEffect(() => { + const ctxCustomMetrics = alertsContextMetadata?.customMetrics ?? []; + const expressionCustomMetrics = customMetric ? [customMetric] : []; + const newCustomMetrics = uniqBy( + [...customMetrics, ...ctxCustomMetrics, ...expressionCustomMetrics], + (cm: SnapshotCustomMetricInput) => cm.id + ); + if (!isEqual(customMetrics, newCustomMetrics)) updateCustomMetrics(newCustomMetrics); + }, [alertsContextMetadata, customMetric, customMetrics, updateCustomMetrics]); const updateMetric = useCallback( - (m?: SnapshotMetricType) => { - setAlertParams(expressionId, { ...expression, metric: m }); + (m?: SnapshotMetricType | string) => { + const newMetric = SnapshotMetricTypeRT.is(m) ? m : 'custom'; + const newAlertParams = { ...expression, metric: newMetric }; + if (newMetric === 'custom' && customMetrics) { + set( + newAlertParams, + 'customMetric', + customMetrics.find((cm) => cm.id === m) + ); + } + setAlertParams(expressionId, newAlertParams); }, - [expressionId, expression, setAlertParams] + [expressionId, expression, setAlertParams, customMetrics] ); const updateComparator = useCallback( @@ -446,6 +497,7 @@ export const ExpressionRow: React.FC = (props) => { break; case 'host': myMetrics = hostMetricTypes; + break; case 'pod': myMetrics = podMetricTypes; @@ -454,8 +506,17 @@ export const ExpressionRow: React.FC = (props) => { myMetrics = containerMetricTypes; break; } - return myMetrics.map(toMetricOpt); - }, [props.nodeType]); + const baseMetricOpts = myMetrics.map(toMetricOpt); + const customMetricOpts = customMetrics + ? customMetrics.map((m, i) => ({ + text: getCustomMetricLabel(m), + value: m.id, + })) + : []; + return [...baseMetricOpts, ...customMetricOpts]; + }, [props.nodeType, customMetrics]); + + const selectedMetricValue = metric === 'custom' && customMetric ? customMetric.id : metric!; return ( <> @@ -465,8 +526,8 @@ export const ExpressionRow: React.FC = (props) => { v?.value === metric)?.text || '', + value: selectedMetricValue, + text: ofFields.find((v) => v?.value === selectedMetricValue)?.text || '', }} metrics={ ofFields.filter((m) => m !== undefined && m.value !== undefined) as Array<{ @@ -568,4 +629,5 @@ const metricUnit: Record = { s3DownloadBytes: { label: 'bytes' }, sqsOldestMessage: { label: 'seconds' }, rdsLatency: { label: 'ms' }, + custom: { label: '' }, }; diff --git a/x-pack/plugins/infra/public/alerting/inventory/components/metric.tsx b/x-pack/plugins/infra/public/alerting/inventory/components/metric.tsx index ff859a95a3d9d..2e5ccbe1a4276 100644 --- a/x-pack/plugins/infra/public/alerting/inventory/components/metric.tsx +++ b/x-pack/plugins/infra/public/alerting/inventory/components/metric.tsx @@ -18,13 +18,12 @@ import { import { EuiPopoverTitle, EuiButtonIcon } from '@elastic/eui'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { IErrorObject } from '../../../../../triggers_actions_ui/public/types'; -import { SnapshotMetricType } from '../../../../common/inventory_models/types'; interface Props { - metric?: { value: SnapshotMetricType; text: string }; + metric?: { value: string; text: string }; metrics: Array<{ value: string; text: string }>; errors: IErrorObject; - onChange: (metric?: SnapshotMetricType) => void; + onChange: (metric?: string) => void; popupPosition?: | 'upCenter' | 'upLeft' @@ -104,7 +103,7 @@ export const MetricExpression = ({ metric, metrics, errors, onChange, popupPosit renderOption={(o: any) => o.label} onChange={(selectedOptions) => { if (selectedOptions.length > 0) { - onChange(selectedOptions[0].value as SnapshotMetricType); + onChange(selectedOptions[0].value); setAggFieldPopoverOpen(false); } else { onChange(); diff --git a/x-pack/plugins/infra/public/alerting/inventory/hooks/use_inventory_alert_prefill.ts b/x-pack/plugins/infra/public/alerting/inventory/hooks/use_inventory_alert_prefill.ts index d659057b95ed9..a57f9cafa5e19 100644 --- a/x-pack/plugins/infra/public/alerting/inventory/hooks/use_inventory_alert_prefill.ts +++ b/x-pack/plugins/infra/public/alerting/inventory/hooks/use_inventory_alert_prefill.ts @@ -5,20 +5,26 @@ */ import { useState } from 'react'; -import { SnapshotMetricInput } from '../../../../common/http_api/snapshot_api'; +import { + SnapshotMetricInput, + SnapshotCustomMetricInput, +} from '../../../../common/http_api/snapshot_api'; import { InventoryItemType } from '../../../../common/inventory_models/types'; export const useInventoryAlertPrefill = () => { const [nodeType, setNodeType] = useState('host'); const [filterQuery, setFilterQuery] = useState(); const [metric, setMetric] = useState({ type: 'cpu' }); + const [customMetrics, setCustomMetrics] = useState([]); return { nodeType, filterQuery, metric, + customMetrics, setNodeType, setFilterQuery, setMetric, + setCustomMetrics, }; }; diff --git a/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.test.tsx b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.test.tsx new file mode 100644 index 0000000000000..d5be8a2ec2675 --- /dev/null +++ b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.test.tsx @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { mountWithIntl, nextTick } from 'test_utils/enzyme_helpers'; +import { MetricExpression } from '../types'; +import React from 'react'; +import { ExpressionRow } from './expression_row'; +import { act } from 'react-dom/test-utils'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { Comparator } from '../../../../server/lib/alerting/metric_threshold/types'; + +jest.mock('../../../containers/source/use_source_via_http', () => ({ + useSourceViaHttp: () => ({ + source: { id: 'default' }, + createDerivedIndexPattern: () => ({ fields: [], title: 'metricbeat-*' }), + }), +})); + +describe('ExpressionRow', () => { + async function setup(expression: MetricExpression) { + const wrapper = mountWithIntl( + {}} + addExpression={() => {}} + key={1} + expressionId={1} + setAlertParams={() => {}} + errors={{ + aggField: [], + timeSizeUnit: [], + timeWindowSize: [], + }} + expression={expression} + /> + ); + + const update = async () => + await act(async () => { + await nextTick(); + wrapper.update(); + }); + + await update(); + + return { wrapper, update }; + } + + it('should display thresholds as a percentage for pct metrics', async () => { + const expression = { + metric: 'system.cpu.user.pct', + comparator: Comparator.GT, + threshold: [0.5], + timeSize: 1, + timeUnit: 'm', + aggType: 'avg', + }; + const { wrapper } = await setup(expression as MetricExpression); + const [valueMatch] = wrapper.html().match('50') ?? []; + expect(valueMatch).toBeTruthy(); + }); + + it('should display thresholds as a decimal for all other metrics', async () => { + const expression = { + metric: 'system.load.1', + comparator: Comparator.GT, + threshold: [0.5], + timeSize: 1, + timeUnit: 'm', + aggType: 'avg', + }; + const { wrapper } = await setup(expression as MetricExpression); + const [valueMatch] = + wrapper.html().match('0.5') ?? []; + expect(valueMatch).toBeTruthy(); + }); +}); diff --git a/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.tsx b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.tsx index 653b9e1d5c308..1487557bde3a0 100644 --- a/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.tsx +++ b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression_row.tsx @@ -3,10 +3,11 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import React, { useCallback, useState } from 'react'; +import React, { useCallback, useState, useMemo } from 'react'; import { i18n } from '@kbn/i18n'; -import { EuiFlexGroup, EuiFlexItem, EuiButtonIcon, EuiSpacer } from '@elastic/eui'; +import { EuiFlexGroup, EuiFlexItem, EuiButtonIcon, EuiSpacer, EuiText } from '@elastic/eui'; import { IFieldType } from 'src/plugins/data/public'; +import { pctToDecimal, decimalToPct } from '../../../../common/utils/corrected_percent_convert'; import { WhenExpression, OfExpression, @@ -76,6 +77,8 @@ export const ExpressionRow: React.FC = (props) => { threshold = [], } = expression; + const isMetricPct = useMemo(() => metric && metric.endsWith('.pct'), [metric]); + const updateAggType = useCallback( (at: string) => { setAlertParams(expressionId, { @@ -102,14 +105,22 @@ export const ExpressionRow: React.FC = (props) => { ); const updateThreshold = useCallback( - (t) => { + (enteredThreshold) => { + const t = isMetricPct + ? enteredThreshold.map((v: number) => pctToDecimal(v)) + : enteredThreshold; if (t.join() !== expression.threshold.join()) { setAlertParams(expressionId, { ...expression, threshold: t }); } }, - [expressionId, expression, setAlertParams] + [expressionId, expression, isMetricPct, setAlertParams] ); + const displayedThreshold = useMemo(() => { + if (isMetricPct) return threshold.map((v) => decimalToPct(v)); + return threshold; + }, [threshold, isMetricPct]); + return ( <> @@ -149,13 +160,22 @@ export const ExpressionRow: React.FC = (props) => { + {isMetricPct && ( +
+ % +
+ )} {canDelete && ( diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/index.tsx b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/index.tsx index aae787c8c0395..96169810e02a8 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/index.tsx +++ b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/index.tsx @@ -8,13 +8,13 @@ import { EuiPopover } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React, { useState, useCallback } from 'react'; import { IFieldType } from 'src/plugins/data/public'; +import { getCustomMetricLabel } from '../../../../../../../common/formatters/get_custom_metric_label'; import { SnapshotMetricInput, SnapshotCustomMetricInput, SnapshotCustomMetricInputRT, } from '../../../../../../../common/http_api/snapshot_api'; import { CustomMetricForm } from './custom_metric_form'; -import { getCustomMetricLabel } from './get_custom_metric_label'; import { MetricsContextMenu } from './metrics_context_menu'; import { ModeSwitcher } from './mode_switcher'; import { MetricsEditMode } from './metrics_edit_mode'; diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_context_menu.tsx b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_context_menu.tsx index 7ab90297ebbb3..71a000d3165b6 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_context_menu.tsx +++ b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_context_menu.tsx @@ -5,6 +5,7 @@ */ import React, { useCallback } from 'react'; import { EuiContextMenuPanelDescriptor, EuiContextMenu } from '@elastic/eui'; +import { getCustomMetricLabel } from '../../../../../../../common/formatters/get_custom_metric_label'; import { SnapshotMetricInput, SnapshotCustomMetricInput, @@ -14,7 +15,6 @@ import { SnapshotMetricTypeRT, SnapshotMetricType, } from '../../../../../../../common/inventory_models/types'; -import { getCustomMetricLabel } from './get_custom_metric_label'; interface Props { options: Array<{ text: string; value: string }>; diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_edit_mode.tsx b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_edit_mode.tsx index 649dcc4282d67..e75885ccbc917 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_edit_mode.tsx +++ b/x-pack/plugins/infra/public/pages/metrics/inventory_view/components/waffle/metric_control/metrics_edit_mode.tsx @@ -6,8 +6,8 @@ import React from 'react'; import { EuiFlexItem, EuiFlexGroup, EuiButtonIcon } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; +import { getCustomMetricLabel } from '../../../../../../../common/formatters/get_custom_metric_label'; import { SnapshotCustomMetricInput } from '../../../../../../../common/http_api/snapshot_api'; -import { getCustomMetricLabel } from './get_custom_metric_label'; import { EuiTheme, withTheme, diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.test.ts b/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.test.ts index 579073e9500d0..d44d8fca4faba 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.test.ts +++ b/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.test.ts @@ -20,6 +20,7 @@ jest.mock('react-router-dom', () => ({ // reassign them, so we can't make these both part of the same object let PREFILL_NODETYPE: WaffleOptionsState['nodeType'] | undefined; let PREFILL_METRIC: WaffleOptionsState['metric'] | undefined; +let PREFILL_CUSTOM_METRICS: WaffleOptionsState['customMetrics'] | undefined; jest.mock('../../../../alerting/use_alert_prefill', () => ({ useAlertPrefillContext: () => ({ inventoryPrefill: { @@ -29,6 +30,9 @@ jest.mock('../../../../alerting/use_alert_prefill', () => ({ setMetric(metric: WaffleOptionsState['metric']) { PREFILL_METRIC = metric; }, + setCustomMetrics(customMetrics: WaffleOptionsState['customMetrics']) { + PREFILL_CUSTOM_METRICS = customMetrics; + }, }, }), })); @@ -39,6 +43,7 @@ describe('useWaffleOptions', () => { beforeEach(() => { PREFILL_NODETYPE = undefined; PREFILL_METRIC = undefined; + PREFILL_CUSTOM_METRICS = undefined; }); it('should sync the options to the inventory alert preview context', () => { @@ -47,6 +52,15 @@ describe('useWaffleOptions', () => { const newOptions = { nodeType: 'pod', metric: { type: 'memory' }, + customMetrics: [ + { + type: 'custom', + id: + "i don't want to bother to copy and paste an actual uuid so instead i'm going to smash my keyboard skjdghsjodkyjheurvjnsgn", + aggregation: 'avg', + field: 'hey.system.are.you.good', + }, + ], } as WaffleOptionsState; act(() => { result.current.changeNodeType(newOptions.nodeType); @@ -58,5 +72,10 @@ describe('useWaffleOptions', () => { }); rerender(); expect(PREFILL_METRIC).toEqual(newOptions.metric); + act(() => { + result.current.changeCustomMetrics(newOptions.customMetrics); + }); + rerender(); + expect(PREFILL_CUSTOM_METRICS).toEqual(newOptions.customMetrics); }); }); diff --git a/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.ts b/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.ts index 8059d1ad12a3a..35d069adc939e 100644 --- a/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.ts +++ b/x-pack/plugins/infra/public/pages/metrics/inventory_view/hooks/use_waffle_options.ts @@ -125,9 +125,10 @@ export const useWaffleOptions = () => { const { inventoryPrefill } = useAlertPrefillContext(); useEffect(() => { - const { setNodeType, setMetric } = inventoryPrefill; + const { setNodeType, setMetric, setCustomMetrics } = inventoryPrefill; setNodeType(state.nodeType); setMetric(state.metric); + setCustomMetrics(state.customMetrics); }, [state, inventoryPrefill]); return { diff --git a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/evaluate_condition.ts b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/evaluate_condition.ts index 3b795810b39f0..9be6a4b52157c 100644 --- a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/evaluate_condition.ts +++ b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/evaluate_condition.ts @@ -5,6 +5,7 @@ */ import { mapValues, last, first } from 'lodash'; import moment from 'moment'; +import { SnapshotCustomMetricInput } from '../../../../common/http_api/snapshot_api'; import { isTooManyBucketsPreviewException, TOO_MANY_BUCKETS_PREVIEW_EXCEPTION, @@ -37,7 +38,7 @@ export const evaluateCondition = async ( filterQuery?: string, lookbackSize?: number ): Promise> => { - const { comparator, metric } = condition; + const { comparator, metric, customMetric } = condition; let { threshold } = condition; const timerange = { @@ -55,7 +56,8 @@ export const evaluateCondition = async ( metric, timerange, sourceConfiguration, - filterQuery + filterQuery, + customMetric ); threshold = threshold.map((n) => convertMetricValue(metric, n)); @@ -93,19 +95,24 @@ const getData = async ( metric: SnapshotMetricType, timerange: InfraTimerangeInput, sourceConfiguration: InfraSourceConfiguration, - filterQuery?: string + filterQuery?: string, + customMetric?: SnapshotCustomMetricInput ) => { const snapshot = new InfraSnapshot(); const esClient = ( options: CallWithRequestParams ): Promise> => callCluster('search', options); + const metrics = [ + metric === 'custom' ? (customMetric as SnapshotCustomMetricInput) : { type: metric }, + ]; + const options = { filterQuery: parseFilterQuery(filterQuery), nodeType, groupBy: [], sourceConfiguration, - metrics: [{ type: metric }], + metrics, timerange, includeTimeseries: Boolean(timerange.lookbackSize), }; diff --git a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/inventory_metric_threshold_executor.ts b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/inventory_metric_threshold_executor.ts index 7b816f2f225b5..db1ff26ee1810 100644 --- a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/inventory_metric_threshold_executor.ts +++ b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/inventory_metric_threshold_executor.ts @@ -6,6 +6,7 @@ import { first, get, last } from 'lodash'; import { i18n } from '@kbn/i18n'; import moment from 'moment'; +import { getCustomMetricLabel } from '../../../../common/formatters/get_custom_metric_label'; import { toMetricOpt } from '../../../../common/snapshot_metric_i18n'; import { AlertStates, InventoryMetricConditions } from './types'; import { AlertExecutorOptions } from '../../../../../alerts/server'; @@ -77,27 +78,19 @@ export const createInventoryMetricThresholdExecutor = (libs: InfraBackendLibs) = let reason; if (nextState === AlertStates.ALERT) { reason = results - .map((result) => { - if (!result[item]) return ''; - const resultWithVerboseMetricName = { - ...result[item], - metric: toMetricOpt(result[item].metric)?.text || result[item].metric, - currentValue: formatMetric(result[item].metric, result[item].currentValue), - }; - return buildFiredAlertReason(resultWithVerboseMetricName); - }) + .map((result) => buildReasonWithVerboseMetricName(result[item], buildFiredAlertReason)) .join('\n'); } if (alertOnNoData) { if (nextState === AlertStates.NO_DATA) { reason = results .filter((result) => result[item].isNoData) - .map((result) => buildNoDataAlertReason(result[item])) + .map((result) => buildReasonWithVerboseMetricName(result[item], buildNoDataAlertReason)) .join('\n'); } else if (nextState === AlertStates.ERROR) { reason = results .filter((result) => result[item].isError) - .map((result) => buildErrorAlertReason(result[item].metric)) + .map((result) => buildReasonWithVerboseMetricName(result[item], buildErrorAlertReason)) .join('\n'); } } @@ -121,6 +114,20 @@ export const createInventoryMetricThresholdExecutor = (libs: InfraBackendLibs) = } }; +const buildReasonWithVerboseMetricName = (resultItem: any, buildReason: (r: any) => string) => { + if (!resultItem) return ''; + const resultWithVerboseMetricName = { + ...resultItem, + metric: + toMetricOpt(resultItem.metric)?.text || + (resultItem.metric === 'custom' + ? getCustomMetricLabel(resultItem.customMetric) + : resultItem.metric), + currentValue: formatMetric(resultItem.metric, resultItem.currentValue), + }; + return buildReason(resultWithVerboseMetricName); +}; + const mapToConditionsLookup = ( list: any[], mapFn: (value: any, index: number, array: any[]) => unknown @@ -140,10 +147,6 @@ export const FIRED_ACTIONS = { }; const formatMetric = (metric: SnapshotMetricType, value: number) => { - // if (SnapshotCustomMetricInputRT.is(metric)) { - // const formatter = createFormatterForMetric(metric); - // return formatter(val); - // } const metricFormatter = get(METRIC_FORMATTERS, metric, METRIC_FORMATTERS.count); if (value == null) { return ''; diff --git a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/register_inventory_metric_threshold_alert_type.ts b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/register_inventory_metric_threshold_alert_type.ts index f664a59acd165..14d1acf0e4a9f 100644 --- a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/register_inventory_metric_threshold_alert_type.ts +++ b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/register_inventory_metric_threshold_alert_type.ts @@ -27,6 +27,15 @@ const condition = schema.object({ timeUnit: schema.string(), timeSize: schema.number(), metric: schema.string(), + customMetric: schema.maybe( + schema.object({ + type: schema.literal('custom'), + id: schema.string(), + field: schema.string(), + aggregation: schema.string(), + label: schema.maybe(schema.string()), + }) + ), }); export const registerMetricInventoryThresholdAlertType = (libs: InfraBackendLibs) => ({ diff --git a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/types.ts b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/types.ts index 86c77e6d7459a..06f5efaf9eb36 100644 --- a/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/types.ts +++ b/x-pack/plugins/infra/server/lib/alerting/inventory_metric_threshold/types.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ import { Unit } from '@elastic/datemath'; +import { SnapshotCustomMetricInput } from '../../../../common/http_api/snapshot_api'; import { SnapshotMetricType } from '../../../../common/inventory_models/types'; import { Comparator, AlertStates } from '../common/types'; @@ -18,4 +19,5 @@ export interface InventoryMetricConditions { sourceId?: string; threshold: number[]; comparator: Comparator; + customMetric?: SnapshotCustomMetricInput; } diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts index fa705798baf7a..3a52bb6b6ce71 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts @@ -387,6 +387,36 @@ describe('The metric threshold alert type', () => { // expect(getState(instanceID).alertState).toBe(AlertStates.OK); // }); // }); + + describe('querying a metric with a percentage metric', () => { + const instanceID = '*'; + const execute = () => + executor({ + services, + params: { + sourceId: 'default', + criteria: [ + { + ...baseCriterion, + metric: 'test.metric.pct', + comparator: Comparator.GT, + threshold: [0.75], + }, + ], + }, + }); + test('reports values converted from decimals to percentages to the action context', async () => { + const now = 1577858400000; + await execute(); + const { action } = mostRecentAction(instanceID); + expect(action.group).toBe('*'); + expect(action.reason).toContain('current value is 100%'); + expect(action.reason).toContain('threshold of 75%'); + expect(action.threshold.condition0[0]).toBe('75%'); + expect(action.value.condition0).toBe('100%'); + expect(action.timestamp).toBe(new Date(now).toISOString()); + }); + }); }); const createMockStaticConfiguration = (sources: any) => ({ diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts index b2a8f0281b9e2..9265e8089e915 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts @@ -14,6 +14,7 @@ import { buildNoDataAlertReason, stateToAlertMessage, } from '../common/messages'; +import { createFormatter } from '../../../../common/formatters'; import { AlertStates } from './types'; import { evaluateAlert } from './lib/evaluate_alert'; @@ -59,7 +60,7 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) => let reason; if (nextState === AlertStates.ALERT) { reason = alertResults - .map((result) => buildFiredAlertReason(result[group] as any)) + .map((result) => buildFiredAlertReason(formatAlertResult(result[group]) as any)) .join('\n'); } if (alertOnNoData) { @@ -83,8 +84,14 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) => alertState: stateToAlertMessage[nextState], reason, timestamp, - value: mapToConditionsLookup(alertResults, (result) => result[group].currentValue), - threshold: mapToConditionsLookup(criteria, (c) => c.threshold), + value: mapToConditionsLookup( + alertResults, + (result) => formatAlertResult(result[group]).currentValue + ), + threshold: mapToConditionsLookup( + alertResults, + (result) => formatAlertResult(result[group]).threshold + ), metric: mapToConditionsLookup(criteria, (c) => c.metric), }); } @@ -113,3 +120,18 @@ const mapToConditionsLookup = ( (result: Record, value, i) => ({ ...result, [`condition${i}`]: value }), {} ); + +const formatAlertResult = (alertResult: { + metric: string; + currentValue: number; + threshold: number[]; +}) => { + const { metric, currentValue, threshold } = alertResult; + if (!metric.endsWith('.pct')) return alertResult; + const formatter = createFormatter('percent'); + return { + ...alertResult, + currentValue: formatter(currentValue), + threshold: Array.isArray(threshold) ? threshold.map((v: number) => formatter(v)) : threshold, + }; +}; diff --git a/x-pack/plugins/task_manager/server/polling/index.ts b/x-pack/plugins/task_manager/server/polling/index.ts new file mode 100644 index 0000000000000..5c1f06eaeb256 --- /dev/null +++ b/x-pack/plugins/task_manager/server/polling/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { createObservableMonitor } from './observable_monitor'; +export { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; +export { timeoutPromiseAfter } from './timeout_promise_after'; diff --git a/x-pack/plugins/task_manager/server/polling/observable_monitor.test.ts b/x-pack/plugins/task_manager/server/polling/observable_monitor.test.ts new file mode 100644 index 0000000000000..0b7bbdfb623e5 --- /dev/null +++ b/x-pack/plugins/task_manager/server/polling/observable_monitor.test.ts @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { interval, from, Subject } from 'rxjs'; +import { map, concatMap, takeWhile, take } from 'rxjs/operators'; + +import { createObservableMonitor } from './observable_monitor'; +import { times } from 'lodash'; + +describe('Poll Monitor', () => { + test('returns a cold observable so that the monitored Observable is only created on demand', async () => { + const instantiator = jest.fn(() => new Subject()); + + createObservableMonitor(instantiator); + + expect(instantiator).not.toHaveBeenCalled(); + }); + + test('subscribing to the observable instantiates a new observable and pipes its results through', async () => { + const instantiator = jest.fn(() => from([0, 1, 2])); + const monitoredObservable = createObservableMonitor(instantiator); + + expect(instantiator).not.toHaveBeenCalled(); + + return new Promise((resolve) => { + const next = jest.fn(); + monitoredObservable.pipe(take(3)).subscribe({ + next, + complete: () => { + expect(instantiator).toHaveBeenCalled(); + expect(next).toHaveBeenCalledWith(0); + expect(next).toHaveBeenCalledWith(1); + expect(next).toHaveBeenCalledWith(2); + resolve(); + }, + }); + }); + }); + + test('unsubscribing from the monitor prevents the monitor from resubscribing to the observable', async () => { + const heartbeatInterval = 1000; + const instantiator = jest.fn(() => interval(100)); + const monitoredObservable = createObservableMonitor(instantiator, { heartbeatInterval }); + + return new Promise((resolve) => { + const next = jest.fn(); + monitoredObservable.pipe(take(3)).subscribe({ + next, + complete: () => { + expect(instantiator).toHaveBeenCalledTimes(1); + setTimeout(() => { + expect(instantiator).toHaveBeenCalledTimes(1); + resolve(); + }, heartbeatInterval * 2); + }, + }); + }); + }); + + test(`ensures the observable subscription hasn't closed at a fixed interval and reinstantiates if it has`, async () => { + let iteration = 0; + const instantiator = jest.fn(() => { + iteration++; + return interval(100).pipe( + map((index) => `${iteration}:${index}`), + // throw on 3rd value of the first iteration + map((value, index) => { + if (iteration === 1 && index === 3) { + throw new Error('Source threw an error!'); + } + return value; + }) + ); + }); + + const onError = jest.fn(); + const monitoredObservable = createObservableMonitor(instantiator, { onError }); + + return new Promise((resolve) => { + const next = jest.fn(); + const error = jest.fn(); + monitoredObservable + .pipe( + // unsubscribe once we confirm we have successfully recovered from an error in the source + takeWhile(function validateExpectation() { + try { + [...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach( + (expecteArg) => { + expect(next).toHaveBeenCalledWith(expecteArg); + } + ); + return false; + } catch { + return true; + } + }) + ) + .subscribe({ + next, + error, + complete: () => { + expect(error).not.toHaveBeenCalled(); + expect(onError).toHaveBeenCalledWith(new Error('Source threw an error!')); + resolve(); + }, + }); + }); + }); + + test(`ensures the observable subscription hasn't hung at a fixed interval and reinstantiates if it has`, async () => { + let iteration = 0; + const instantiator = jest.fn(() => { + iteration++; + return interval(100).pipe( + map((index) => `${iteration}:${index}`), + // hang on 3rd value of the first iteration + concatMap((value, index) => { + if (iteration === 1 && index === 3) { + return new Promise(() => { + // never resolve or reject, just hang for EVER + }); + } + return Promise.resolve(value); + }) + ); + }); + + const onError = jest.fn(); + const monitoredObservable = createObservableMonitor(instantiator, { + onError, + heartbeatInterval: 100, + inactivityTimeout: 500, + }); + + return new Promise((resolve) => { + const next = jest.fn(); + const error = jest.fn(); + monitoredObservable + .pipe( + // unsubscribe once we confirm we have successfully recovered from an error in the source + takeWhile(function validateExpectation() { + try { + [...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach( + (expecteArg) => { + expect(next).toHaveBeenCalledWith(expecteArg); + } + ); + return false; + } catch { + return true; + } + }) + ) + .subscribe({ + next, + error, + complete: () => { + expect(error).not.toHaveBeenCalled(); + expect(onError).toHaveBeenCalledWith( + new Error(`Observable Monitor: Hung Observable restarted after 500ms of inactivity`) + ); + resolve(); + }, + }); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/polling/observable_monitor.ts b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts new file mode 100644 index 0000000000000..7b06117ef59d1 --- /dev/null +++ b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs'; +import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators'; +import { noop } from 'lodash'; + +const DEFAULT_HEARTBEAT_INTERVAL = 1000; + +// by default don't monitor inactivity as not all observables are expected +// to emit at any kind of fixed interval +const DEFAULT_INACTIVITY_TIMEOUT = 0; + +export interface ObservableMonitorOptions { + heartbeatInterval?: number; + inactivityTimeout?: number; + onError?: (err: E) => void; +} + +export function createObservableMonitor( + observableFactory: () => Observable, + { + heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL, + inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT, + onError = noop, + }: ObservableMonitorOptions = {} +): Observable { + return new Observable((subscriber) => { + const subscription: Subscription = interval(heartbeatInterval) + .pipe( + // switch from the heartbeat interval to the instantiated observable until it completes / errors + exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)), + // if an error is thrown, catch it, notify and try to recover + catchError((err: E, source$: Observable) => { + onError(err); + // return source, which will allow our observable to recover from this error and + // keep pulling values out of it + return source$; + }) + ) + .subscribe(subscriber); + return () => { + subscription.unsubscribe(); + }; + }); +} + +function takeUntilDurationOfInactivity(source$: Observable, inactivityTimeout: number) { + // if there's a specified maximum duration of inactivity, only take values until that + // duration elapses without any new events + if (inactivityTimeout) { + // an observable which starts a timer every time a new value is passed in, replacing the previous timer + // if the timer goes off without having been reset by a fresh value, it will emit a single event - which will + // notify our monitor that the source has been inactive for too long + const inactivityMonitor$ = new Subject(); + return source$.pipe( + takeUntil( + inactivityMonitor$.pipe( + // on each new emited value, start a new timer, discarding the old one + switchMap(() => timer(inactivityTimeout)), + // every time a timer expires (meaning no new value came in on time to discard it) + // throw an error, forcing the monitor instantiate a new observable + switchMapTo( + throwError( + new Error( + `Observable Monitor: Hung Observable restarted after ${inactivityTimeout}ms of inactivity` + ) + ) + ) + ) + ), + // poke `inactivityMonitor$` so it restarts the timer + tap(() => inactivityMonitor$.next()) + ); + } + return source$; +} diff --git a/x-pack/plugins/task_manager/server/task_poller.test.ts b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts similarity index 99% rename from x-pack/plugins/task_manager/server/task_poller.test.ts rename to x-pack/plugins/task_manager/server/polling/task_poller.test.ts index 98e6d0f9388a4..607e2ac2b80fa 100644 --- a/x-pack/plugins/task_manager/server/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts @@ -9,8 +9,8 @@ import { Subject } from 'rxjs'; import { Option, none, some } from 'fp-ts/lib/Option'; import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; import { fakeSchedulers } from 'rxjs-marbles/jest'; -import { sleep, resolvable, Resolvable } from './test_utils'; -import { asOk, asErr } from './lib/result_type'; +import { sleep, resolvable, Resolvable } from '../test_utils'; +import { asOk, asErr } from '../lib/result_type'; describe('TaskPoller', () => { beforeEach(() => jest.useFakeTimers()); diff --git a/x-pack/plugins/task_manager/server/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts similarity index 97% rename from x-pack/plugins/task_manager/server/task_poller.ts rename to x-pack/plugins/task_manager/server/polling/task_poller.ts index 88511f42f96fb..a1435ffafe8f8 100644 --- a/x-pack/plugins/task_manager/server/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -15,7 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators' import { pipe } from 'fp-ts/lib/pipeable'; import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; -import { pullFromSet } from './lib/pull_from_set'; +import { pullFromSet } from '../lib/pull_from_set'; import { Result, Err, @@ -24,8 +24,8 @@ import { asOk, asErr, promiseResult, -} from './lib/result_type'; -import { timeoutPromiseAfter } from './lib/timeout_promise_after'; +} from '../lib/result_type'; +import { timeoutPromiseAfter } from './timeout_promise_after'; type WorkFn = (...params: T[]) => Promise; diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts b/x-pack/plugins/task_manager/server/polling/timeout_promise_after.test.ts similarity index 100% rename from x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts rename to x-pack/plugins/task_manager/server/polling/timeout_promise_after.test.ts diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts b/x-pack/plugins/task_manager/server/polling/timeout_promise_after.ts similarity index 100% rename from x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts rename to x-pack/plugins/task_manager/server/polling/timeout_promise_after.ts diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 2c812f0da516d..fb2d5e07030a4 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -46,7 +46,12 @@ import { TaskStatus, ElasticJs, } from './task'; -import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; +import { + createTaskPoller, + PollingError, + PollingErrorType, + createObservableMonitor, +} from './polling'; import { TaskPool } from './task_pool'; import { TaskManagerRunner, TaskRunner } from './task_runner'; import { @@ -154,18 +159,38 @@ export class TaskManager { maxWorkers: opts.config.max_workers, }); - this.poller$ = createTaskPoller({ - pollInterval: opts.config.poll_interval, - bufferCapacity: opts.config.request_capacity, - getCapacity: () => this.pool.availableWorkers, - pollRequests$: this.claimRequests$, - work: this.pollForWork, - // Time out the `work` phase if it takes longer than a certain number of polling cycles - // The `work` phase includes the prework needed *before* executing a task - // (such as polling for new work, marking tasks as running etc.) but does not - // include the time of actually running the task - workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles, - }); + const { + max_poll_inactivity_cycles: maxPollInactivityCycles, + poll_interval: pollInterval, + } = opts.config; + this.poller$ = createObservableMonitor>, Error>( + () => + createTaskPoller({ + pollInterval, + bufferCapacity: opts.config.request_capacity, + getCapacity: () => this.pool.availableWorkers, + pollRequests$: this.claimRequests$, + work: this.pollForWork, + // Time out the `work` phase if it takes longer than a certain number of polling cycles + // The `work` phase includes the prework needed *before* executing a task + // (such as polling for new work, marking tasks as running etc.) but does not + // include the time of actually running the task + workTimeout: pollInterval * maxPollInactivityCycles, + }), + { + heartbeatInterval: pollInterval, + // Time out the poller itself if it has failed to complete the entire stream for a certain amount of time. + // This is different that the `work` timeout above, as the poller could enter an invalid state where + // it fails to complete a cycle even thought `work` is completing quickly. + // We grant it a single cycle longer than the time alotted to `work` so that timing out the `work` + // doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive + // operation than just timing out the `work` internally) + inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1), + onError: (error) => { + this.logger.error(`[Task Poller Monitor]: ${error.message}`); + }, + } + ); } private emitEvent = (event: TaskLifecycleEvent) => {