diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts index cbd89db97236f..a01042616a872 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts @@ -10,3 +10,4 @@ export * from './log_entry_category_examples'; export * from './log_entry_rate'; export * from './log_entry_examples'; export * from './log_entry_anomalies'; +export * from './log_entry_anomalies_datasets'; diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts index eeca2cebaa028..17abe7235ae89 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts @@ -112,6 +112,8 @@ export const getLogEntryAnomaliesRequestPayloadRT = rt.type({ pagination: paginationRT, // Sort properties sort: sortRT, + // Dataset filters + datasets: rt.array(rt.string), }), ]), }); diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..56784dba1be44 --- /dev/null +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as rt from 'io-ts'; + +import { + badRequestErrorRT, + forbiddenErrorRT, + timeRangeRT, + routeTimingMetadataRT, +} from '../../shared'; + +export const LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH = + '/api/infra/log_analysis/results/log_entry_anomalies_datasets'; + +/** + * request + */ + +export const getLogEntryAnomaliesDatasetsRequestPayloadRT = rt.type({ + data: rt.type({ + // the id of the source configuration + sourceId: rt.string, + // the time range to fetch the anomalies datasets from + timeRange: timeRangeRT, + }), +}); + +export type GetLogEntryAnomaliesDatasetsRequestPayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsRequestPayloadRT +>; + +/** + * response + */ + +export const getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT = rt.intersection([ + rt.type({ + data: rt.type({ + datasets: rt.array(rt.string), + }), + }), + rt.partial({ + timing: routeTimingMetadataRT, + }), +]); + +export type GetLogEntryAnomaliesDatasetsSuccessResponsePayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT +>; + +export const getLogEntryAnomaliesDatasetsResponsePayloadRT = rt.union([ + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + badRequestErrorRT, + forbiddenErrorRT, +]); + +export type GetLogEntryAnomaliesDatasetsReponsePayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsResponsePayloadRT +>; diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts index b7e8a49735152..20a8e5c378cec 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts @@ -16,11 +16,16 @@ export const LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH = */ export const getLogEntryRateRequestPayloadRT = rt.type({ - data: rt.type({ - bucketDuration: rt.number, - sourceId: rt.string, - timeRange: timeRangeRT, - }), + data: rt.intersection([ + rt.type({ + bucketDuration: rt.number, + sourceId: rt.string, + timeRange: timeRangeRT, + }), + rt.partial({ + datasets: rt.array(rt.string), + }), + ]), }); export type GetLogEntryRateRequestPayload = rt.TypeOf; diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx b/x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx similarity index 92% rename from x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx rename to x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx index ab938ff1d1374..2236dc9e45da6 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx +++ b/x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx @@ -8,7 +8,7 @@ import { EuiComboBox, EuiComboBoxOptionOption } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React, { useCallback, useMemo } from 'react'; -import { getFriendlyNameForPartitionId } from '../../../../../../common/log_analysis'; +import { getFriendlyNameForPartitionId } from '../../../../common/log_analysis'; type DatasetOptionProps = EuiComboBoxOptionOption; @@ -51,7 +51,7 @@ export const DatasetsSelector: React.FunctionComponent<{ }; const datasetFilterPlaceholder = i18n.translate( - 'xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder', + 'xpack.infra.logs.analysis.datasetFilterPlaceholder', { defaultMessage: 'Filter by datasets', } diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx b/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx index 37d26de6fce70..ea23bc468bc76 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx @@ -14,7 +14,7 @@ import { BetaBadge } from '../../../../../components/beta_badge'; import { LoadingOverlayWrapper } from '../../../../../components/loading_overlay_wrapper'; import { RecreateJobButton } from '../../../../../components/logging/log_analysis_job_status'; import { AnalyzeInMlButton } from '../../../../../components/logging/log_analysis_results'; -import { DatasetsSelector } from './datasets_selector'; +import { DatasetsSelector } from '../../../../../components/logging/log_analysis_results/datasets_selector'; import { TopCategoriesTable } from './top_categories_table'; export const TopCategoriesSection: React.FunctionComponent<{ diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx index 85646c39445a1..05967e952953e 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx @@ -21,6 +21,7 @@ import { StringTimeRange, useLogAnalysisResultsUrlState, } from './use_log_entry_rate_results_url_state'; +import { DatasetsSelector } from '../../../components/logging/log_analysis_results/datasets_selector'; const JOB_STATUS_POLLING_INTERVAL = 30000; @@ -76,15 +77,17 @@ export const LogEntryRateResultsContent: React.FunctionComponent([]); + const { getLogEntryRate, isLoading, logEntryRate } = useLogEntryRateResults({ sourceId, startTime: queryTimeRange.value.startTime, endTime: queryTimeRange.value.endTime, bucketDuration, + filteredDatasets: selectedDatasets, }); const { - getLogEntryAnomalies, isLoadingLogEntryAnomalies, logEntryAnomalies, page, @@ -94,6 +97,8 @@ export const LogEntryRateResultsContent: React.FunctionComponent { getLogEntryRate(); - }, [getLogEntryRate, getLogEntryAnomalies, queryTimeRange.lastChangedTime]); + }, [getLogEntryRate, selectedDatasets, queryTimeRange.lastChangedTime]); useEffect(() => { fetchModuleDefinition(); @@ -197,7 +203,15 @@ export const LogEntryRateResultsContent: React.FunctionComponent - + + + + { const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_PATH, { method: 'POST', @@ -32,6 +33,7 @@ export const callGetLogEntryAnomaliesAPI = async ( }, sort, pagination, + datasets, }, }) ), diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..acc57e7758530 --- /dev/null +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { fold } from 'fp-ts/lib/Either'; +import { pipe } from 'fp-ts/lib/pipeable'; +import { identity } from 'fp-ts/lib/function'; +import { npStart } from '../../../../legacy_singletons'; + +import { + getLogEntryAnomaliesDatasetsRequestPayloadRT, + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, +} from '../../../../../common/http_api/log_analysis'; +import { createPlainError, throwErrors } from '../../../../../common/runtime_types'; + +export const callGetLogEntryAnomaliesDatasetsAPI = async ( + sourceId: string, + startTime: number, + endTime: number +) => { + const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, { + method: 'POST', + body: JSON.stringify( + getLogEntryAnomaliesDatasetsRequestPayloadRT.encode({ + data: { + sourceId, + timeRange: { + startTime, + endTime, + }, + }, + }) + ), + }); + + return pipe( + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT.decode(response), + fold(throwErrors(createPlainError), identity) + ); +}; diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts index 794139385f467..77111d279309d 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts @@ -19,7 +19,8 @@ export const callGetLogEntryRateAPI = async ( sourceId: string, startTime: number, endTime: number, - bucketDuration: number + bucketDuration: number, + datasets?: string[] ) => { const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH, { method: 'POST', @@ -32,6 +33,7 @@ export const callGetLogEntryRateAPI = async ( endTime, }, bucketDuration, + datasets, }, }) ), diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts index 8d8fdfc879114..5a30602b1d155 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts @@ -5,11 +5,17 @@ */ import { useMemo, useState, useCallback, useEffect } from 'react'; - -import { LogEntryAnomaly } from '../../../../common/http_api'; -import { useTrackedPromise } from '../../../utils/use_tracked_promise'; +import { useMount } from 'react-use'; +import { useTrackedPromise, CanceledPromiseError } from '../../../utils/use_tracked_promise'; import { callGetLogEntryAnomaliesAPI } from './service_calls/get_log_entry_anomalies'; -import { Sort, Pagination, PaginationCursor } from '../../../../common/http_api/log_analysis'; +import { callGetLogEntryAnomaliesDatasetsAPI } from './service_calls/get_log_entry_anomalies_datasets'; +import { + Sort, + Pagination, + PaginationCursor, + GetLogEntryAnomaliesDatasetsSuccessResponsePayload, + LogEntryAnomaly, +} from '../../../../common/http_api/log_analysis'; export type SortOptions = Sort; export type PaginationOptions = Pick; @@ -19,6 +25,7 @@ export type FetchPreviousPage = () => void; export type ChangeSortOptions = (sortOptions: Sort) => void; export type ChangePaginationOptions = (paginationOptions: PaginationOptions) => void; export type LogEntryAnomalies = LogEntryAnomaly[]; +type LogEntryAnomaliesDatasets = GetLogEntryAnomaliesDatasetsSuccessResponsePayload['data']['datasets']; interface PaginationCursors { previousPageCursor: PaginationCursor; nextPageCursor: PaginationCursor; @@ -31,6 +38,8 @@ export const useLogEntryAnomaliesResults = ({ lastChangedTime, defaultSortOptions, defaultPaginationOptions, + onGetLogEntryAnomaliesDatasetsError, + filteredDatasets, }: { endTime: number; startTime: number; @@ -38,6 +47,8 @@ export const useLogEntryAnomaliesResults = ({ lastChangedTime: number; defaultSortOptions: Sort; defaultPaginationOptions: Pick; + onGetLogEntryAnomaliesDatasetsError?: (error: Error) => void; + filteredDatasets?: string[]; }) => { // Pagination const [page, setPage] = useState(1); @@ -66,10 +77,17 @@ export const useLogEntryAnomaliesResults = ({ { cancelPreviousOn: 'creation', createPromise: async () => { - return await callGetLogEntryAnomaliesAPI(sourceId, startTime, endTime, sortOptions, { - ...paginationOptions, - cursor: paginationCursor, - }); + return await callGetLogEntryAnomaliesAPI( + sourceId, + startTime, + endTime, + sortOptions, + { + ...paginationOptions, + cursor: paginationCursor, + }, + filteredDatasets + ); }, onResolve: ({ data: { anomalies, paginationCursors: requestCursors, hasMoreEntries } }) => { if (requestCursors) { @@ -86,7 +104,16 @@ export const useLogEntryAnomaliesResults = ({ setLogEntryAnomalies(anomalies); }, }, - [endTime, sourceId, startTime, sortOptions, paginationOptions, setHasNextPage, paginationCursor] + [ + endTime, + sourceId, + startTime, + sortOptions, + paginationOptions, + setHasNextPage, + paginationCursor, + filteredDatasets, + ] ); const changeSortOptions = useCallback( @@ -106,14 +133,14 @@ export const useLogEntryAnomaliesResults = ({ ); useEffect(() => { - // Time range has changed + // Time range or dataset filters have changed resetPagination(); - }, [lastChangedTime, resetPagination]); + }, [lastChangedTime, filteredDatasets, resetPagination]); useEffect(() => { // Refetch entries when options change getLogEntryAnomalies(); - }, [sortOptions, paginationOptions, getLogEntryAnomalies, paginationCursor]); + }, [sortOptions, paginationOptions, getLogEntryAnomalies, page, paginationCursor]); const handleFetchNextPage = useCallback(() => { if (lastReceivedCursors) { @@ -143,10 +170,53 @@ export const useLogEntryAnomaliesResults = ({ [getLogEntryAnomaliesRequest.state] ); + // Anomalies datasets + const [logEntryAnomaliesDatasets, setLogEntryAnomaliesDatasets] = useState< + LogEntryAnomaliesDatasets + >([]); + + const [getLogEntryAnomaliesDatasetsRequest, getLogEntryAnomaliesDatasets] = useTrackedPromise( + { + cancelPreviousOn: 'creation', + createPromise: async () => { + return await callGetLogEntryAnomaliesDatasetsAPI(sourceId, startTime, endTime); + }, + onResolve: ({ data: { datasets } }) => { + setLogEntryAnomaliesDatasets(datasets); + }, + onReject: (error) => { + if ( + error instanceof Error && + !(error instanceof CanceledPromiseError) && + onGetLogEntryAnomaliesDatasetsError + ) { + onGetLogEntryAnomaliesDatasetsError(error); + } + }, + }, + [endTime, sourceId, startTime] + ); + + const isLoadingDatasets = useMemo(() => getLogEntryAnomaliesDatasetsRequest.state === 'pending', [ + getLogEntryAnomaliesDatasetsRequest.state, + ]); + + const hasFailedLoadingDatasets = useMemo( + () => getLogEntryAnomaliesDatasetsRequest.state === 'rejected', + [getLogEntryAnomaliesDatasetsRequest.state] + ); + + useMount(() => { + getLogEntryAnomaliesDatasets(); + }); + return { logEntryAnomalies, getLogEntryAnomalies, isLoadingLogEntryAnomalies, + isLoadingDatasets, + hasFailedLoadingDatasets, + datasets: logEntryAnomaliesDatasets, hasFailedLoadingLogEntryAnomalies, changeSortOptions, sortOptions, diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts index 1cd27c64af53f..a52dab58cb018 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts @@ -41,11 +41,13 @@ export const useLogEntryRateResults = ({ startTime, endTime, bucketDuration = 15 * 60 * 1000, + filteredDatasets, }: { sourceId: string; startTime: number; endTime: number; bucketDuration: number; + filteredDatasets?: string[]; }) => { const [logEntryRate, setLogEntryRate] = useState(null); @@ -53,7 +55,13 @@ export const useLogEntryRateResults = ({ { cancelPreviousOn: 'resolution', createPromise: async () => { - return await callGetLogEntryRateAPI(sourceId, startTime, endTime, bucketDuration); + return await callGetLogEntryRateAPI( + sourceId, + startTime, + endTime, + bucketDuration, + filteredDatasets + ); }, onResolve: ({ data }) => { setLogEntryRate({ @@ -68,7 +76,7 @@ export const useLogEntryRateResults = ({ setLogEntryRate(null); }, }, - [sourceId, startTime, endTime, bucketDuration] + [sourceId, startTime, endTime, bucketDuration, filteredDatasets] ); const isLoading = useMemo(() => getLogEntryRateRequest.state === 'pending', [ diff --git a/x-pack/plugins/infra/server/infra_server.ts b/x-pack/plugins/infra/server/infra_server.ts index 6596e07ebaca5..c080618f2a563 100644 --- a/x-pack/plugins/infra/server/infra_server.ts +++ b/x-pack/plugins/infra/server/infra_server.ts @@ -19,6 +19,7 @@ import { initValidateLogAnalysisDatasetsRoute, initValidateLogAnalysisIndicesRoute, initGetLogEntryAnomaliesRoute, + initGetLogEntryAnomaliesDatasetsRoute, } from './routes/log_analysis'; import { initMetricExplorerRoute } from './routes/metrics_explorer'; import { initMetadataRoute } from './routes/metadata'; @@ -53,6 +54,7 @@ export const initInfraServer = (libs: InfraBackendLibs) => { initGetLogEntryCategoryExamplesRoute(libs); initGetLogEntryRateRoute(libs); initGetLogEntryAnomaliesRoute(libs); + initGetLogEntryAnomaliesDatasetsRoute(libs); initSnapshotRoute(libs); initNodeDetailsRoute(libs); initSourceRoute(libs); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/common.ts b/x-pack/plugins/infra/server/lib/log_analysis/common.ts index 0c0b0a0f19982..218281d875a46 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/common.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/common.ts @@ -4,10 +4,19 @@ * you may not use this file except in compliance with the Elastic License. */ -import type { MlAnomalyDetectors } from '../../types'; -import { startTracingSpan } from '../../../common/performance_tracing'; +import type { MlAnomalyDetectors, MlSystem } from '../../types'; import { NoLogAnalysisMlJobError } from './errors'; +import { + CompositeDatasetKey, + createLogEntryDatasetsQuery, + LogEntryDatasetBucket, + logEntryDatasetsResponseRT, +} from './queries/log_entry_data_sets'; +import { decodeOrThrow } from '../../../common/runtime_types'; +import { NoLogAnalysisResultsIndexError } from './errors'; +import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing'; + export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId: string) { const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES'); const { @@ -27,3 +36,63 @@ export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId: }, }; } + +const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; + +// Finds datasets related to ML job ids +export async function getLogEntryDatasets( + mlSystem: MlSystem, + startTime: number, + endTime: number, + jobIds: string[] +) { + const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); + + let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; + let afterLatestBatchKey: CompositeDatasetKey | undefined; + let esSearchSpans: TracingSpan[] = []; + + while (true) { + const finalizeEsSearchSpan = startTracingSpan('fetch log entry dataset batch from ES'); + + const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( + await mlSystem.mlAnomalySearch( + createLogEntryDatasetsQuery( + jobIds, + startTime, + endTime, + COMPOSITE_AGGREGATION_BATCH_SIZE, + afterLatestBatchKey + ) + ) + ); + + if (logEntryDatasetsResponse._shards.total === 0) { + throw new NoLogAnalysisResultsIndexError( + `Failed to find ml indices for jobs: ${jobIds.join(', ')}.` + ); + } + + const { + after_key: afterKey, + buckets: latestBatchBuckets, + } = logEntryDatasetsResponse.aggregations.dataset_buckets; + + logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; + afterLatestBatchKey = afterKey; + esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; + + if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { + break; + } + } + + const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); + + return { + data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset), + timing: { + spans: [logEntryDatasetsSpan, ...esSearchSpans], + }, + }; +} diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts index 5bc30985724ba..332a3c90fcc58 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts @@ -7,30 +7,25 @@ import { RequestHandlerContext } from 'src/core/server'; import { InfraRequestHandlerContext } from '../../types'; import { TracingSpan, startTracingSpan } from '../../../common/performance_tracing'; -import { fetchMlJob } from './common'; +import { fetchMlJob, getLogEntryDatasets } from './common'; import { getJobId, logEntryCategoriesJobTypes } from '../../../common/log_analysis'; -import { Sort, Pagination } from '../../../common/http_api/log_analysis'; -import type { MlSystem } from '../../types'; +import { + Sort, + Pagination, + GetLogEntryAnomaliesRequestPayload, +} from '../../../common/http_api/log_analysis'; +import type { MlSystem, MlAnomalyDetectors } from '../../types'; import { createLogEntryAnomaliesQuery, logEntryAnomaliesResponseRT } from './queries'; import { decodeOrThrow } from '../../../common/runtime_types'; import { InsufficientAnomalyMlJobsConfigured } from './errors'; -export async function getLogEntryAnomalies( - context: RequestHandlerContext & { infra: Required }, +async function getCompatibleAnomaliesJobIds( + spaceId: string, sourceId: string, - startTime: number, - endTime: number, - sort: Sort, - pagination: Pagination + mlAnomalyDetectors: MlAnomalyDetectors ) { - const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies'); - - const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate'); - const logCategoriesJobId = getJobId( - context.infra.spaceId, - sourceId, - logEntryCategoriesJobTypes[0] - ); + const logRateJobId = getJobId(spaceId, sourceId, 'log-entry-rate'); + const logCategoriesJobId = getJobId(spaceId, sourceId, logEntryCategoriesJobTypes[0]); const jobIds: string[] = []; let jobSpans: TracingSpan[] = []; @@ -38,7 +33,7 @@ export async function getLogEntryAnomalies( try { const { timing: { spans }, - } = await fetchMlJob(context.infra.mlAnomalyDetectors, logRateJobId); + } = await fetchMlJob(mlAnomalyDetectors, logRateJobId); jobIds.push(logRateJobId); jobSpans = [...jobSpans, ...spans]; } catch (e) { @@ -48,13 +43,39 @@ export async function getLogEntryAnomalies( try { const { timing: { spans }, - } = await fetchMlJob(context.infra.mlAnomalyDetectors, logCategoriesJobId); + } = await fetchMlJob(mlAnomalyDetectors, logCategoriesJobId); jobIds.push(logCategoriesJobId); jobSpans = [...jobSpans, ...spans]; } catch (e) { // Job wasn't found } + return { + jobIds, + timing: { spans: jobSpans }, + }; +} + +export async function getLogEntryAnomalies( + context: RequestHandlerContext & { infra: Required }, + sourceId: string, + startTime: number, + endTime: number, + sort: Sort, + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] +) { + const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies'); + + const { + jobIds, + timing: { spans: jobSpans }, + } = await getCompatibleAnomaliesJobIds( + context.infra.spaceId, + sourceId, + context.infra.mlAnomalyDetectors + ); + if (jobIds.length === 0) { throw new InsufficientAnomalyMlJobsConfigured( 'Log rate or categorisation ML jobs need to be configured to search anomalies' @@ -72,7 +93,8 @@ export async function getLogEntryAnomalies( startTime, endTime, sort, - pagination + pagination, + datasets ); const data = anomalies.map((anomaly) => { @@ -95,7 +117,7 @@ export async function getLogEntryAnomalies( actual, duration, startTime: anomalyStartTime, - type: jobId === logRateJobId ? ('logRate' as const) : ('logCategory' as const), + type: jobId.includes('log-entry-rate') ? ('logRate' as const) : ('logCategory' as const), }; }); @@ -117,7 +139,8 @@ async function fetchLogEntryAnomalies( startTime: number, endTime: number, sort: Sort, - pagination: Pagination + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] ) { // We'll request 1 extra entry on top of our pageSize to determine if there are // more entries to be fetched. This avoids scenarios where the client side can't @@ -129,7 +152,7 @@ async function fetchLogEntryAnomalies( const results = decodeOrThrow(logEntryAnomaliesResponseRT)( await mlSystem.mlAnomalySearch( - createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination) + createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination, datasets) ) ); @@ -191,3 +214,43 @@ async function fetchLogEntryAnomalies( }, }; } + +export async function getLogEntryAnomaliesDatasets( + context: { + infra: { + mlSystem: MlSystem; + mlAnomalyDetectors: MlAnomalyDetectors; + spaceId: string; + }; + }, + sourceId: string, + startTime: number, + endTime: number +) { + const { + jobIds, + timing: { spans: jobSpans }, + } = await getCompatibleAnomaliesJobIds( + context.infra.spaceId, + sourceId, + context.infra.mlAnomalyDetectors + ); + + if (jobIds.length === 0) { + throw new InsufficientAnomalyMlJobsConfigured( + 'Log rate or categorisation ML jobs need to be configured to search for anomaly datasets' + ); + } + + const { + data: datasets, + timing: { spans: datasetsSpans }, + } = await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds); + + return { + datasets, + timing: { + spans: [...jobSpans, ...datasetsSpans], + }, + }; +} diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts index 08f098f27c770..485c4defe0c9f 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts @@ -44,7 +44,7 @@ import { topLogEntryCategoriesResponseRT, } from './queries/top_log_entry_categories'; import { InfraSource } from '../sources'; -import { fetchMlJob } from './common'; +import { fetchMlJob, getLogEntryDatasets } from './common'; const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; @@ -129,61 +129,15 @@ export async function getLogEntryCategoryDatasets( startTime: number, endTime: number ) { - const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); - const logEntryCategoriesCountJobId = getJobId( context.infra.spaceId, sourceId, logEntryCategoriesJobTypes[0] ); - let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; - let afterLatestBatchKey: CompositeDatasetKey | undefined; - let esSearchSpans: TracingSpan[] = []; - - while (true) { - const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES'); - - const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( - await context.infra.mlSystem.mlAnomalySearch( - createLogEntryDatasetsQuery( - logEntryCategoriesCountJobId, - startTime, - endTime, - COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey - ) - ) - ); - - if (logEntryDatasetsResponse._shards.total === 0) { - throw new NoLogAnalysisResultsIndexError( - `Failed to find ml result index for job ${logEntryCategoriesCountJobId}.` - ); - } - - const { - after_key: afterKey, - buckets: latestBatchBuckets, - } = logEntryDatasetsResponse.aggregations.dataset_buckets; - - logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; - afterLatestBatchKey = afterKey; - esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; + const jobIds = [logEntryCategoriesCountJobId]; - if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { - break; - } - } - - const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); - - return { - data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset), - timing: { - spans: [logEntryDatasetsSpan, ...esSearchSpans], - }, - }; + return await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds); } export async function getLogEntryCategoryExamples( diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts index 971c35b54720c..7929dfd398695 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts @@ -44,7 +44,8 @@ export async function getLogEntryRateBuckets( sourceId: string, startTime: number, endTime: number, - bucketDuration: number + bucketDuration: number, + datasets?: string[] ) { const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate'); let mlModelPlotBuckets: LogRateModelPlotBucket[] = []; @@ -58,7 +59,8 @@ export async function getLogEntryRateBuckets( endTime, bucketDuration, COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey + afterLatestBatchKey, + datasets ) ); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts index cd86cd846f370..43e968b892483 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts @@ -55,3 +55,14 @@ export const createCategoryIdFilters = (categoryIds: number[]) => [ }, }, ]; + +export const createDatasetsFilters = (datasets?: string[]) => + datasets && datasets.length > 0 + ? [ + { + terms: { + partition_field_value: datasets, + }, + }, + ] + : []; diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts index d02d7bde9f340..ed02284ef394d 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts @@ -11,8 +11,13 @@ import { createTimeRangeFilters, createResultTypeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; -import { Sort, Pagination } from '../../../../common/http_api/log_analysis'; +import { + Sort, + Pagination, + GetLogEntryAnomaliesRequestPayload, +} from '../../../../common/http_api/log_analysis'; // TODO: Reassess validity of this against ML docs const TIEBREAKER_FIELD = '_doc'; @@ -28,7 +33,8 @@ export const createLogEntryAnomaliesQuery = ( startTime: number, endTime: number, sort: Sort, - pagination: Pagination + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] ) => { const { field } = sort; const { pageSize } = pagination; @@ -37,6 +43,7 @@ export const createLogEntryAnomaliesQuery = ( ...createJobIdFilters(jobIds), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['record']), + ...createDatasetsFilters(datasets), ]; const sourceFields = [ 'job_id', diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts index 1b38a9173b402..7d7c784c46e85 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts @@ -7,14 +7,14 @@ import * as rt from 'io-ts'; import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; import { - createJobIdFilter, + createJobIdFilters, createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, } from './common'; export const createLogEntryDatasetsQuery = ( - logEntryAnalysisJobId: string, + jobIds: string[], startTime: number, endTime: number, size: number, @@ -25,7 +25,7 @@ export const createLogEntryDatasetsQuery = ( query: { bool: { filter: [ - ...createJobIdFilter(logEntryAnalysisJobId), + ...createJobIdFilters(jobIds), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['model_plot']), ], diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts index 861648c40f872..69231fef7b7cd 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts @@ -10,6 +10,7 @@ import { createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; export const createLogEntryRateQuery = ( @@ -18,7 +19,8 @@ export const createLogEntryRateQuery = ( endTime: number, bucketDuration: number, size: number, - afterKey?: CompositeTimestampPartitionKey + afterKey?: CompositeTimestampPartitionKey, + datasets?: string[] ) => ({ ...defaultRequestParameters, body: { @@ -28,6 +30,7 @@ export const createLogEntryRateQuery = ( ...createJobIdFilter(logRateJobId), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['model_plot', 'record']), + ...createDatasetsFilters(datasets), { term: { detector_index: { diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts index 0983e1fe4b886..77ab321bb7a9c 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts @@ -11,6 +11,7 @@ import { createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; export const createTopLogEntryCategoriesQuery = ( @@ -122,17 +123,6 @@ export const createTopLogEntryCategoriesQuery = ( size: 0, }); -const createDatasetsFilters = (datasets: string[]) => - datasets.length > 0 - ? [ - { - terms: { - partition_field_value: datasets, - }, - }, - ] - : []; - const metricAggregationRT = rt.type({ value: rt.union([rt.number, rt.null]), }); diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts index cbd89db97236f..a01042616a872 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts @@ -10,3 +10,4 @@ export * from './log_entry_category_examples'; export * from './log_entry_rate'; export * from './log_entry_examples'; export * from './log_entry_anomalies'; +export * from './log_entry_anomalies_datasets'; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts index 8b3017f20a72f..3db54dc59613b 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts @@ -34,6 +34,7 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) = timeRange: { startTime, endTime }, sort: sortParam, pagination: paginationParam, + datasets, }, } = request.body; @@ -53,7 +54,8 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) = startTime, endTime, sort, - pagination + pagination, + datasets ); return response.ok({ diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..d3d0862eee9aa --- /dev/null +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Boom from 'boom'; +import { + getLogEntryAnomaliesDatasetsRequestPayloadRT, + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, +} from '../../../../common/http_api/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import type { InfraBackendLibs } from '../../../lib/infra_types'; +import { + getLogEntryAnomaliesDatasets, + NoLogAnalysisResultsIndexError, +} from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; + +export const initGetLogEntryAnomaliesDatasetsRoute = ({ framework }: InfraBackendLibs) => { + framework.registerRoute( + { + method: 'post', + path: LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, + validate: { + body: createValidationFunction(getLogEntryAnomaliesDatasetsRequestPayloadRT), + }, + }, + framework.router.handleLegacyErrors(async (requestContext, request, response) => { + const { + data: { + sourceId, + timeRange: { startTime, endTime }, + }, + } = request.body; + + try { + assertHasInfraMlPlugins(requestContext); + + const { datasets, timing } = await getLogEntryAnomaliesDatasets( + requestContext, + sourceId, + startTime, + endTime + ); + + return response.ok({ + body: getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT.encode({ + data: { + datasets, + }, + timing, + }), + }); + } catch (error) { + if (Boom.isBoom(error)) { + throw error; + } + + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); + } + + return response.customError({ + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, + }); + } + }) + ); +}; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts index ae86102980c16..3b05f6ed23aae 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts @@ -27,7 +27,7 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => { }, framework.router.handleLegacyErrors(async (requestContext, request, response) => { const { - data: { sourceId, timeRange, bucketDuration }, + data: { sourceId, timeRange, bucketDuration, datasets }, } = request.body; try { @@ -38,7 +38,8 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => { sourceId, timeRange.startTime, timeRange.endTime, - bucketDuration + bucketDuration, + datasets ); return response.ok({