diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts index cbd89db97236f..a01042616a872 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/index.ts @@ -10,3 +10,4 @@ export * from './log_entry_category_examples'; export * from './log_entry_rate'; export * from './log_entry_examples'; export * from './log_entry_anomalies'; +export * from './log_entry_anomalies_datasets'; diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts index 639ac63f9b14d..62b76a0ae475e 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies.ts @@ -128,6 +128,8 @@ export const getLogEntryAnomaliesRequestPayloadRT = rt.type({ pagination: paginationRT, // Sort properties sort: sortRT, + // Dataset filters + datasets: rt.array(rt.string), }), ]), }); diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..56784dba1be44 --- /dev/null +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_anomalies_datasets.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as rt from 'io-ts'; + +import { + badRequestErrorRT, + forbiddenErrorRT, + timeRangeRT, + routeTimingMetadataRT, +} from '../../shared'; + +export const LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH = + '/api/infra/log_analysis/results/log_entry_anomalies_datasets'; + +/** + * request + */ + +export const getLogEntryAnomaliesDatasetsRequestPayloadRT = rt.type({ + data: rt.type({ + // the id of the source configuration + sourceId: rt.string, + // the time range to fetch the anomalies datasets from + timeRange: timeRangeRT, + }), +}); + +export type GetLogEntryAnomaliesDatasetsRequestPayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsRequestPayloadRT +>; + +/** + * response + */ + +export const getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT = rt.intersection([ + rt.type({ + data: rt.type({ + datasets: rt.array(rt.string), + }), + }), + rt.partial({ + timing: routeTimingMetadataRT, + }), +]); + +export type GetLogEntryAnomaliesDatasetsSuccessResponsePayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT +>; + +export const getLogEntryAnomaliesDatasetsResponsePayloadRT = rt.union([ + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + badRequestErrorRT, + forbiddenErrorRT, +]); + +export type GetLogEntryAnomaliesDatasetsReponsePayload = rt.TypeOf< + typeof getLogEntryAnomaliesDatasetsResponsePayloadRT +>; diff --git a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts index b7e8a49735152..20a8e5c378cec 100644 --- a/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts +++ b/x-pack/plugins/infra/common/http_api/log_analysis/results/log_entry_rate.ts @@ -16,11 +16,16 @@ export const LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH = */ export const getLogEntryRateRequestPayloadRT = rt.type({ - data: rt.type({ - bucketDuration: rt.number, - sourceId: rt.string, - timeRange: timeRangeRT, - }), + data: rt.intersection([ + rt.type({ + bucketDuration: rt.number, + sourceId: rt.string, + timeRange: timeRangeRT, + }), + rt.partial({ + datasets: rt.array(rt.string), + }), + ]), }); export type GetLogEntryRateRequestPayload = rt.TypeOf; diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx b/x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx similarity index 92% rename from x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx rename to x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx index ab938ff1d1374..2236dc9e45da6 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/datasets_selector.tsx +++ b/x-pack/plugins/infra/public/components/logging/log_analysis_results/datasets_selector.tsx @@ -8,7 +8,7 @@ import { EuiComboBox, EuiComboBoxOptionOption } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React, { useCallback, useMemo } from 'react'; -import { getFriendlyNameForPartitionId } from '../../../../../../common/log_analysis'; +import { getFriendlyNameForPartitionId } from '../../../../common/log_analysis'; type DatasetOptionProps = EuiComboBoxOptionOption; @@ -51,7 +51,7 @@ export const DatasetsSelector: React.FunctionComponent<{ }; const datasetFilterPlaceholder = i18n.translate( - 'xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder', + 'xpack.infra.logs.analysis.datasetFilterPlaceholder', { defaultMessage: 'Filter by datasets', } diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx b/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx index 37d26de6fce70..ea23bc468bc76 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_categories/sections/top_categories/top_categories_section.tsx @@ -14,7 +14,7 @@ import { BetaBadge } from '../../../../../components/beta_badge'; import { LoadingOverlayWrapper } from '../../../../../components/loading_overlay_wrapper'; import { RecreateJobButton } from '../../../../../components/logging/log_analysis_job_status'; import { AnalyzeInMlButton } from '../../../../../components/logging/log_analysis_results'; -import { DatasetsSelector } from './datasets_selector'; +import { DatasetsSelector } from '../../../../../components/logging/log_analysis_results/datasets_selector'; import { TopCategoriesTable } from './top_categories_table'; export const TopCategoriesSection: React.FunctionComponent<{ diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx index 21c3e3ec70029..800db1aa1332d 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/page_results_content.tsx @@ -21,6 +21,7 @@ import { StringTimeRange, useLogAnalysisResultsUrlState, } from './use_log_entry_rate_results_url_state'; +import { DatasetsSelector } from '../../../components/logging/log_analysis_results/datasets_selector'; const JOB_STATUS_POLLING_INTERVAL = 30000; @@ -75,11 +76,14 @@ export const LogEntryRateResultsContent: React.FunctionComponent([]); + const { getLogEntryRate, isLoading, logEntryRate } = useLogEntryRateResults({ sourceId, startTime: queryTimeRange.value.startTime, endTime: queryTimeRange.value.endTime, bucketDuration, + filteredDatasets: selectedDatasets, }); const { @@ -92,12 +96,15 @@ export const LogEntryRateResultsContent: React.FunctionComponent { getLogEntryRate(); - }, [getLogEntryRate, queryTimeRange.lastChangedTime]); + }, [getLogEntryRate, selectedDatasets, queryTimeRange.lastChangedTime]); useEffect(() => { fetchModuleDefinition(); @@ -194,7 +201,15 @@ export const LogEntryRateResultsContent: React.FunctionComponent - + + + + { const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_PATH, { method: 'POST', @@ -32,6 +33,7 @@ export const callGetLogEntryAnomaliesAPI = async ( }, sort, pagination, + datasets, }, }) ), diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..24be5a646d103 --- /dev/null +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_anomalies_datasets.ts @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { npStart } from '../../../../legacy_singletons'; +import { decodeOrThrow } from '../../../../../common/runtime_types'; +import { + getLogEntryAnomaliesDatasetsRequestPayloadRT, + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, +} from '../../../../../common/http_api/log_analysis'; + +export const callGetLogEntryAnomaliesDatasetsAPI = async ( + sourceId: string, + startTime: number, + endTime: number +) => { + const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, { + method: 'POST', + body: JSON.stringify( + getLogEntryAnomaliesDatasetsRequestPayloadRT.encode({ + data: { + sourceId, + timeRange: { + startTime, + endTime, + }, + }, + }) + ), + }); + + return decodeOrThrow(getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT)(response); +}; diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts index 794139385f467..77111d279309d 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/service_calls/get_log_entry_rate.ts @@ -19,7 +19,8 @@ export const callGetLogEntryRateAPI = async ( sourceId: string, startTime: number, endTime: number, - bucketDuration: number + bucketDuration: number, + datasets?: string[] ) => { const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH, { method: 'POST', @@ -32,6 +33,7 @@ export const callGetLogEntryRateAPI = async ( endTime, }, bucketDuration, + datasets, }, }) ), diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts index cadb4c420c133..52632e54390a9 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_anomalies_results.ts @@ -5,11 +5,17 @@ */ import { useMemo, useState, useCallback, useEffect, useReducer } from 'react'; - -import { LogEntryAnomaly } from '../../../../common/http_api'; -import { useTrackedPromise } from '../../../utils/use_tracked_promise'; +import { useMount } from 'react-use'; +import { useTrackedPromise, CanceledPromiseError } from '../../../utils/use_tracked_promise'; import { callGetLogEntryAnomaliesAPI } from './service_calls/get_log_entry_anomalies'; -import { Sort, Pagination, PaginationCursor } from '../../../../common/http_api/log_analysis'; +import { callGetLogEntryAnomaliesDatasetsAPI } from './service_calls/get_log_entry_anomalies_datasets'; +import { + Sort, + Pagination, + PaginationCursor, + GetLogEntryAnomaliesDatasetsSuccessResponsePayload, + LogEntryAnomaly, +} from '../../../../common/http_api/log_analysis'; export type SortOptions = Sort; export type PaginationOptions = Pick; @@ -19,6 +25,7 @@ export type FetchPreviousPage = () => void; export type ChangeSortOptions = (sortOptions: Sort) => void; export type ChangePaginationOptions = (paginationOptions: PaginationOptions) => void; export type LogEntryAnomalies = LogEntryAnomaly[]; +type LogEntryAnomaliesDatasets = GetLogEntryAnomaliesDatasetsSuccessResponsePayload['data']['datasets']; interface PaginationCursors { previousPageCursor: PaginationCursor; nextPageCursor: PaginationCursor; @@ -35,6 +42,7 @@ interface ReducerState { start: number; end: number; }; + filteredDatasets?: string[]; } type ReducerStateDefaults = Pick< @@ -49,7 +57,8 @@ type ReducerAction = | { type: 'fetchPreviousPage' } | { type: 'changeHasNextPage'; payload: { hasNextPage: boolean } } | { type: 'changeLastReceivedCursors'; payload: { lastReceivedCursors: PaginationCursors } } - | { type: 'changeTimeRange'; payload: { timeRange: { start: number; end: number } } }; + | { type: 'changeTimeRange'; payload: { timeRange: { start: number; end: number } } } + | { type: 'changeFilteredDatasets'; payload: { filteredDatasets?: string[] } }; const stateReducer = (state: ReducerState, action: ReducerAction): ReducerState => { const resetPagination = { @@ -101,6 +110,12 @@ const stateReducer = (state: ReducerState, action: ReducerAction): ReducerState ...resetPagination, ...action.payload, }; + case 'changeFilteredDatasets': + return { + ...state, + ...resetPagination, + ...action.payload, + }; default: return state; } @@ -122,18 +137,23 @@ export const useLogEntryAnomaliesResults = ({ sourceId, defaultSortOptions, defaultPaginationOptions, + onGetLogEntryAnomaliesDatasetsError, + filteredDatasets, }: { endTime: number; startTime: number; sourceId: string; defaultSortOptions: Sort; defaultPaginationOptions: Pick; + onGetLogEntryAnomaliesDatasetsError?: (error: Error) => void; + filteredDatasets?: string[]; }) => { const initStateReducer = (stateDefaults: ReducerStateDefaults): ReducerState => { return { ...stateDefaults, paginationOptions: defaultPaginationOptions, sortOptions: defaultSortOptions, + filteredDatasets, timeRange: { start: startTime, end: endTime, @@ -154,6 +174,7 @@ export const useLogEntryAnomaliesResults = ({ sortOptions, paginationOptions, paginationCursor, + filteredDatasets: queryFilteredDatasets, } = reducerState; return await callGetLogEntryAnomaliesAPI( sourceId, @@ -163,7 +184,8 @@ export const useLogEntryAnomaliesResults = ({ { ...paginationOptions, cursor: paginationCursor, - } + }, + queryFilteredDatasets ); }, onResolve: ({ data: { anomalies, paginationCursors: requestCursors, hasMoreEntries } }) => { @@ -192,6 +214,7 @@ export const useLogEntryAnomaliesResults = ({ reducerState.sortOptions, reducerState.paginationOptions, reducerState.paginationCursor, + reducerState.filteredDatasets, ] ); @@ -220,6 +243,14 @@ export const useLogEntryAnomaliesResults = ({ }); }, [startTime, endTime]); + // Selected datasets have changed + useEffect(() => { + dispatch({ + type: 'changeFilteredDatasets', + payload: { filteredDatasets }, + }); + }, [filteredDatasets]); + useEffect(() => { getLogEntryAnomalies(); }, [getLogEntryAnomalies]); @@ -246,10 +277,53 @@ export const useLogEntryAnomaliesResults = ({ [getLogEntryAnomaliesRequest.state] ); + // Anomalies datasets + const [logEntryAnomaliesDatasets, setLogEntryAnomaliesDatasets] = useState< + LogEntryAnomaliesDatasets + >([]); + + const [getLogEntryAnomaliesDatasetsRequest, getLogEntryAnomaliesDatasets] = useTrackedPromise( + { + cancelPreviousOn: 'creation', + createPromise: async () => { + return await callGetLogEntryAnomaliesDatasetsAPI(sourceId, startTime, endTime); + }, + onResolve: ({ data: { datasets } }) => { + setLogEntryAnomaliesDatasets(datasets); + }, + onReject: (error) => { + if ( + error instanceof Error && + !(error instanceof CanceledPromiseError) && + onGetLogEntryAnomaliesDatasetsError + ) { + onGetLogEntryAnomaliesDatasetsError(error); + } + }, + }, + [endTime, sourceId, startTime] + ); + + const isLoadingDatasets = useMemo(() => getLogEntryAnomaliesDatasetsRequest.state === 'pending', [ + getLogEntryAnomaliesDatasetsRequest.state, + ]); + + const hasFailedLoadingDatasets = useMemo( + () => getLogEntryAnomaliesDatasetsRequest.state === 'rejected', + [getLogEntryAnomaliesDatasetsRequest.state] + ); + + useMount(() => { + getLogEntryAnomaliesDatasets(); + }); + return { logEntryAnomalies, getLogEntryAnomalies, isLoadingLogEntryAnomalies, + isLoadingDatasets, + hasFailedLoadingDatasets, + datasets: logEntryAnomaliesDatasets, hasFailedLoadingLogEntryAnomalies, changeSortOptions, sortOptions: reducerState.sortOptions, diff --git a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts index 1cd27c64af53f..a52dab58cb018 100644 --- a/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts +++ b/x-pack/plugins/infra/public/pages/logs/log_entry_rate/use_log_entry_rate_results.ts @@ -41,11 +41,13 @@ export const useLogEntryRateResults = ({ startTime, endTime, bucketDuration = 15 * 60 * 1000, + filteredDatasets, }: { sourceId: string; startTime: number; endTime: number; bucketDuration: number; + filteredDatasets?: string[]; }) => { const [logEntryRate, setLogEntryRate] = useState(null); @@ -53,7 +55,13 @@ export const useLogEntryRateResults = ({ { cancelPreviousOn: 'resolution', createPromise: async () => { - return await callGetLogEntryRateAPI(sourceId, startTime, endTime, bucketDuration); + return await callGetLogEntryRateAPI( + sourceId, + startTime, + endTime, + bucketDuration, + filteredDatasets + ); }, onResolve: ({ data }) => { setLogEntryRate({ @@ -68,7 +76,7 @@ export const useLogEntryRateResults = ({ setLogEntryRate(null); }, }, - [sourceId, startTime, endTime, bucketDuration] + [sourceId, startTime, endTime, bucketDuration, filteredDatasets] ); const isLoading = useMemo(() => getLogEntryRateRequest.state === 'pending', [ diff --git a/x-pack/plugins/infra/server/infra_server.ts b/x-pack/plugins/infra/server/infra_server.ts index 6596e07ebaca5..c080618f2a563 100644 --- a/x-pack/plugins/infra/server/infra_server.ts +++ b/x-pack/plugins/infra/server/infra_server.ts @@ -19,6 +19,7 @@ import { initValidateLogAnalysisDatasetsRoute, initValidateLogAnalysisIndicesRoute, initGetLogEntryAnomaliesRoute, + initGetLogEntryAnomaliesDatasetsRoute, } from './routes/log_analysis'; import { initMetricExplorerRoute } from './routes/metrics_explorer'; import { initMetadataRoute } from './routes/metadata'; @@ -53,6 +54,7 @@ export const initInfraServer = (libs: InfraBackendLibs) => { initGetLogEntryCategoryExamplesRoute(libs); initGetLogEntryRateRoute(libs); initGetLogEntryAnomaliesRoute(libs); + initGetLogEntryAnomaliesDatasetsRoute(libs); initSnapshotRoute(libs); initNodeDetailsRoute(libs); initSourceRoute(libs); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/common.ts b/x-pack/plugins/infra/server/lib/log_analysis/common.ts index 0c0b0a0f19982..218281d875a46 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/common.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/common.ts @@ -4,10 +4,19 @@ * you may not use this file except in compliance with the Elastic License. */ -import type { MlAnomalyDetectors } from '../../types'; -import { startTracingSpan } from '../../../common/performance_tracing'; +import type { MlAnomalyDetectors, MlSystem } from '../../types'; import { NoLogAnalysisMlJobError } from './errors'; +import { + CompositeDatasetKey, + createLogEntryDatasetsQuery, + LogEntryDatasetBucket, + logEntryDatasetsResponseRT, +} from './queries/log_entry_data_sets'; +import { decodeOrThrow } from '../../../common/runtime_types'; +import { NoLogAnalysisResultsIndexError } from './errors'; +import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing'; + export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId: string) { const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES'); const { @@ -27,3 +36,63 @@ export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId: }, }; } + +const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; + +// Finds datasets related to ML job ids +export async function getLogEntryDatasets( + mlSystem: MlSystem, + startTime: number, + endTime: number, + jobIds: string[] +) { + const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); + + let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; + let afterLatestBatchKey: CompositeDatasetKey | undefined; + let esSearchSpans: TracingSpan[] = []; + + while (true) { + const finalizeEsSearchSpan = startTracingSpan('fetch log entry dataset batch from ES'); + + const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( + await mlSystem.mlAnomalySearch( + createLogEntryDatasetsQuery( + jobIds, + startTime, + endTime, + COMPOSITE_AGGREGATION_BATCH_SIZE, + afterLatestBatchKey + ) + ) + ); + + if (logEntryDatasetsResponse._shards.total === 0) { + throw new NoLogAnalysisResultsIndexError( + `Failed to find ml indices for jobs: ${jobIds.join(', ')}.` + ); + } + + const { + after_key: afterKey, + buckets: latestBatchBuckets, + } = logEntryDatasetsResponse.aggregations.dataset_buckets; + + logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; + afterLatestBatchKey = afterKey; + esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; + + if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { + break; + } + } + + const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); + + return { + data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset), + timing: { + spans: [logEntryDatasetsSpan, ...esSearchSpans], + }, + }; +} diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts index 12ae516564d66..950de4261bda0 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_anomalies.ts @@ -7,15 +7,19 @@ import { RequestHandlerContext } from 'src/core/server'; import { InfraRequestHandlerContext } from '../../types'; import { TracingSpan, startTracingSpan } from '../../../common/performance_tracing'; -import { fetchMlJob } from './common'; +import { fetchMlJob, getLogEntryDatasets } from './common'; import { getJobId, logEntryCategoriesJobTypes, logEntryRateJobTypes, jobCustomSettingsRT, } from '../../../common/log_analysis'; -import { Sort, Pagination } from '../../../common/http_api/log_analysis'; -import type { MlSystem } from '../../types'; +import { + Sort, + Pagination, + GetLogEntryAnomaliesRequestPayload, +} from '../../../common/http_api/log_analysis'; +import type { MlSystem, MlAnomalyDetectors } from '../../types'; import { createLogEntryAnomaliesQuery, logEntryAnomaliesResponseRT } from './queries'; import { InsufficientAnomalyMlJobsConfigured, @@ -43,22 +47,13 @@ interface MappedAnomalyHit { categoryId?: string; } -export async function getLogEntryAnomalies( - context: RequestHandlerContext & { infra: Required }, +async function getCompatibleAnomaliesJobIds( + spaceId: string, sourceId: string, - startTime: number, - endTime: number, - sort: Sort, - pagination: Pagination + mlAnomalyDetectors: MlAnomalyDetectors ) { - const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies'); - - const logRateJobId = getJobId(context.infra.spaceId, sourceId, logEntryRateJobTypes[0]); - const logCategoriesJobId = getJobId( - context.infra.spaceId, - sourceId, - logEntryCategoriesJobTypes[0] - ); + const logRateJobId = getJobId(spaceId, sourceId, logEntryRateJobTypes[0]); + const logCategoriesJobId = getJobId(spaceId, sourceId, logEntryCategoriesJobTypes[0]); const jobIds: string[] = []; let jobSpans: TracingSpan[] = []; @@ -66,7 +61,7 @@ export async function getLogEntryAnomalies( try { const { timing: { spans }, - } = await fetchMlJob(context.infra.mlAnomalyDetectors, logRateJobId); + } = await fetchMlJob(mlAnomalyDetectors, logRateJobId); jobIds.push(logRateJobId); jobSpans = [...jobSpans, ...spans]; } catch (e) { @@ -76,13 +71,39 @@ export async function getLogEntryAnomalies( try { const { timing: { spans }, - } = await fetchMlJob(context.infra.mlAnomalyDetectors, logCategoriesJobId); + } = await fetchMlJob(mlAnomalyDetectors, logCategoriesJobId); jobIds.push(logCategoriesJobId); jobSpans = [...jobSpans, ...spans]; } catch (e) { // Job wasn't found } + return { + jobIds, + timing: { spans: jobSpans }, + }; +} + +export async function getLogEntryAnomalies( + context: RequestHandlerContext & { infra: Required }, + sourceId: string, + startTime: number, + endTime: number, + sort: Sort, + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] +) { + const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies'); + + const { + jobIds, + timing: { spans: jobSpans }, + } = await getCompatibleAnomaliesJobIds( + context.infra.spaceId, + sourceId, + context.infra.mlAnomalyDetectors + ); + if (jobIds.length === 0) { throw new InsufficientAnomalyMlJobsConfigured( 'Log rate or categorisation ML jobs need to be configured to search anomalies' @@ -100,16 +121,17 @@ export async function getLogEntryAnomalies( startTime, endTime, sort, - pagination + pagination, + datasets ); const data = anomalies.map((anomaly) => { const { jobId } = anomaly; - if (jobId === logRateJobId) { - return parseLogRateAnomalyResult(anomaly, logRateJobId); + if (!anomaly.categoryId) { + return parseLogRateAnomalyResult(anomaly, jobId); } else { - return parseCategoryAnomalyResult(anomaly, logCategoriesJobId); + return parseCategoryAnomalyResult(anomaly, jobId); } }); @@ -181,7 +203,8 @@ async function fetchLogEntryAnomalies( startTime: number, endTime: number, sort: Sort, - pagination: Pagination + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] ) { // We'll request 1 extra entry on top of our pageSize to determine if there are // more entries to be fetched. This avoids scenarios where the client side can't @@ -193,7 +216,7 @@ async function fetchLogEntryAnomalies( const results = decodeOrThrow(logEntryAnomaliesResponseRT)( await mlSystem.mlAnomalySearch( - createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination) + createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination, datasets) ) ); @@ -396,3 +419,43 @@ export async function fetchLogEntryExamples( }, }; } + +export async function getLogEntryAnomaliesDatasets( + context: { + infra: { + mlSystem: MlSystem; + mlAnomalyDetectors: MlAnomalyDetectors; + spaceId: string; + }; + }, + sourceId: string, + startTime: number, + endTime: number +) { + const { + jobIds, + timing: { spans: jobSpans }, + } = await getCompatibleAnomaliesJobIds( + context.infra.spaceId, + sourceId, + context.infra.mlAnomalyDetectors + ); + + if (jobIds.length === 0) { + throw new InsufficientAnomalyMlJobsConfigured( + 'Log rate or categorisation ML jobs need to be configured to search for anomaly datasets' + ); + } + + const { + data: datasets, + timing: { spans: datasetsSpans }, + } = await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds); + + return { + datasets, + timing: { + spans: [...jobSpans, ...datasetsSpans], + }, + }; +} diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts index 6d00ba56e0e66..a455a03d936a5 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts @@ -12,7 +12,7 @@ import { jobCustomSettingsRT, logEntryCategoriesJobTypes, } from '../../../common/log_analysis'; -import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing'; +import { startTracingSpan } from '../../../common/performance_tracing'; import { decodeOrThrow } from '../../../common/runtime_types'; import type { MlAnomalyDetectors, MlSystem } from '../../types'; import { @@ -33,20 +33,12 @@ import { createLogEntryCategoryHistogramsQuery, logEntryCategoryHistogramsResponseRT, } from './queries/log_entry_category_histograms'; -import { - CompositeDatasetKey, - createLogEntryDatasetsQuery, - LogEntryDatasetBucket, - logEntryDatasetsResponseRT, -} from './queries/log_entry_data_sets'; import { createTopLogEntryCategoriesQuery, topLogEntryCategoriesResponseRT, } from './queries/top_log_entry_categories'; import { InfraSource } from '../sources'; -import { fetchMlJob } from './common'; - -const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; +import { fetchMlJob, getLogEntryDatasets } from './common'; export async function getTopLogEntryCategories( context: { @@ -129,61 +121,15 @@ export async function getLogEntryCategoryDatasets( startTime: number, endTime: number ) { - const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); - const logEntryCategoriesCountJobId = getJobId( context.infra.spaceId, sourceId, logEntryCategoriesJobTypes[0] ); - let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; - let afterLatestBatchKey: CompositeDatasetKey | undefined; - let esSearchSpans: TracingSpan[] = []; - - while (true) { - const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES'); - - const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( - await context.infra.mlSystem.mlAnomalySearch( - createLogEntryDatasetsQuery( - logEntryCategoriesCountJobId, - startTime, - endTime, - COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey - ) - ) - ); - - if (logEntryDatasetsResponse._shards.total === 0) { - throw new NoLogAnalysisResultsIndexError( - `Failed to find ml result index for job ${logEntryCategoriesCountJobId}.` - ); - } - - const { - after_key: afterKey, - buckets: latestBatchBuckets, - } = logEntryDatasetsResponse.aggregations.dataset_buckets; + const jobIds = [logEntryCategoriesCountJobId]; - logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; - afterLatestBatchKey = afterKey; - esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; - - if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { - break; - } - } - - const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); - - return { - data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset), - timing: { - spans: [logEntryDatasetsSpan, ...esSearchSpans], - }, - }; + return await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds); } export async function getLogEntryCategoryExamples( diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts index 0323980dcd013..7bfc85ba78a0e 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts @@ -30,7 +30,8 @@ export async function getLogEntryRateBuckets( sourceId: string, startTime: number, endTime: number, - bucketDuration: number + bucketDuration: number, + datasets?: string[] ) { const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate'); let mlModelPlotBuckets: LogRateModelPlotBucket[] = []; @@ -44,7 +45,8 @@ export async function getLogEntryRateBuckets( endTime, bucketDuration, COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey + afterLatestBatchKey, + datasets ) ); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts index 87394028095de..63e39ef022392 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts @@ -55,3 +55,14 @@ export const createCategoryIdFilters = (categoryIds: number[]) => [ }, }, ]; + +export const createDatasetsFilters = (datasets?: string[]) => + datasets && datasets.length > 0 + ? [ + { + terms: { + partition_field_value: datasets, + }, + }, + ] + : []; diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts index fc72776ea5cac..c722544c509aa 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_anomalies.ts @@ -11,8 +11,13 @@ import { createTimeRangeFilters, createResultTypeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; -import { Sort, Pagination } from '../../../../common/http_api/log_analysis'; +import { + Sort, + Pagination, + GetLogEntryAnomaliesRequestPayload, +} from '../../../../common/http_api/log_analysis'; // TODO: Reassess validity of this against ML docs const TIEBREAKER_FIELD = '_doc'; @@ -28,7 +33,8 @@ export const createLogEntryAnomaliesQuery = ( startTime: number, endTime: number, sort: Sort, - pagination: Pagination + pagination: Pagination, + datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets'] ) => { const { field } = sort; const { pageSize } = pagination; @@ -37,6 +43,7 @@ export const createLogEntryAnomaliesQuery = ( ...createJobIdsFilters(jobIds), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['record']), + ...createDatasetsFilters(datasets), ]; const sourceFields = [ diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts index dd22bedae8b2a..7627ccd8c4996 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts @@ -7,14 +7,14 @@ import * as rt from 'io-ts'; import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; import { - createJobIdFilters, + createJobIdsFilters, createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, } from './common'; export const createLogEntryDatasetsQuery = ( - logEntryAnalysisJobId: string, + jobIds: string[], startTime: number, endTime: number, size: number, @@ -25,7 +25,7 @@ export const createLogEntryDatasetsQuery = ( query: { bool: { filter: [ - ...createJobIdFilters(logEntryAnalysisJobId), + ...createJobIdsFilters(jobIds), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['model_plot']), ], diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts index 8d9c586b2ef67..52edcf09cdfc2 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts @@ -10,6 +10,7 @@ import { createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; export const createLogEntryRateQuery = ( @@ -18,7 +19,8 @@ export const createLogEntryRateQuery = ( endTime: number, bucketDuration: number, size: number, - afterKey?: CompositeTimestampPartitionKey + afterKey?: CompositeTimestampPartitionKey, + datasets?: string[] ) => ({ ...defaultRequestParameters, body: { @@ -28,6 +30,7 @@ export const createLogEntryRateQuery = ( ...createJobIdFilters(logRateJobId), ...createTimeRangeFilters(startTime, endTime), ...createResultTypeFilters(['model_plot', 'record']), + ...createDatasetsFilters(datasets), { term: { detector_index: { diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts index 6fa7156240508..355dde9ec7c4a 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts @@ -11,6 +11,7 @@ import { createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, + createDatasetsFilters, } from './common'; export const createTopLogEntryCategoriesQuery = ( @@ -122,17 +123,6 @@ export const createTopLogEntryCategoriesQuery = ( size: 0, }); -const createDatasetsFilters = (datasets: string[]) => - datasets.length > 0 - ? [ - { - terms: { - partition_field_value: datasets, - }, - }, - ] - : []; - const metricAggregationRT = rt.type({ value: rt.union([rt.number, rt.null]), }); diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts index cbd89db97236f..a01042616a872 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/index.ts @@ -10,3 +10,4 @@ export * from './log_entry_category_examples'; export * from './log_entry_rate'; export * from './log_entry_examples'; export * from './log_entry_anomalies'; +export * from './log_entry_anomalies_datasets'; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts index f4911658ea496..d79c9b9dd2c78 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies.ts @@ -34,6 +34,7 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) = timeRange: { startTime, endTime }, sort: sortParam, pagination: paginationParam, + datasets, }, } = request.body; @@ -53,7 +54,8 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) = startTime, endTime, sort, - pagination + pagination, + datasets ); return response.ok({ diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts new file mode 100644 index 0000000000000..d3d0862eee9aa --- /dev/null +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_anomalies_datasets.ts @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Boom from 'boom'; +import { + getLogEntryAnomaliesDatasetsRequestPayloadRT, + getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT, + LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, +} from '../../../../common/http_api/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import type { InfraBackendLibs } from '../../../lib/infra_types'; +import { + getLogEntryAnomaliesDatasets, + NoLogAnalysisResultsIndexError, +} from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; + +export const initGetLogEntryAnomaliesDatasetsRoute = ({ framework }: InfraBackendLibs) => { + framework.registerRoute( + { + method: 'post', + path: LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, + validate: { + body: createValidationFunction(getLogEntryAnomaliesDatasetsRequestPayloadRT), + }, + }, + framework.router.handleLegacyErrors(async (requestContext, request, response) => { + const { + data: { + sourceId, + timeRange: { startTime, endTime }, + }, + } = request.body; + + try { + assertHasInfraMlPlugins(requestContext); + + const { datasets, timing } = await getLogEntryAnomaliesDatasets( + requestContext, + sourceId, + startTime, + endTime + ); + + return response.ok({ + body: getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT.encode({ + data: { + datasets, + }, + timing, + }), + }); + } catch (error) { + if (Boom.isBoom(error)) { + throw error; + } + + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); + } + + return response.customError({ + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, + }); + } + }) + ); +}; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts index ae86102980c16..3b05f6ed23aae 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts @@ -27,7 +27,7 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => { }, framework.router.handleLegacyErrors(async (requestContext, request, response) => { const { - data: { sourceId, timeRange, bucketDuration }, + data: { sourceId, timeRange, bucketDuration, datasets }, } = request.body; try { @@ -38,7 +38,8 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => { sourceId, timeRange.startTime, timeRange.endTime, - bucketDuration + bucketDuration, + datasets ); return response.ok({ diff --git a/x-pack/plugins/translations/translations/ja-JP.json b/x-pack/plugins/translations/translations/ja-JP.json index c1f36372ec94e..27ff9f2a326e5 100644 --- a/x-pack/plugins/translations/translations/ja-JP.json +++ b/x-pack/plugins/translations/translations/ja-JP.json @@ -7531,7 +7531,6 @@ "xpack.infra.logs.logEntryCategories.categoryQUalityWarningCalloutTitle": "品質に関する警告", "xpack.infra.logs.logEntryCategories.countColumnTitle": "メッセージ数", "xpack.infra.logs.logEntryCategories.datasetColumnTitle": "データセット", - "xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder": "データセットでフィルター", "xpack.infra.logs.logEntryCategories.jobStatusLoadingMessage": "分類ジョブのステータスを確認中...", "xpack.infra.logs.logEntryCategories.loadDataErrorTitle": "カテゴリーデータを読み込めませんでした", "xpack.infra.logs.logEntryCategories.manyCategoriesWarningReasonDescription": "分析されたドキュメントごとのカテゴリ比率が{categoriesDocumentRatio, number }で、非常に高い値です。", diff --git a/x-pack/plugins/translations/translations/zh-CN.json b/x-pack/plugins/translations/translations/zh-CN.json index 7e36d5676585c..5e30559438742 100644 --- a/x-pack/plugins/translations/translations/zh-CN.json +++ b/x-pack/plugins/translations/translations/zh-CN.json @@ -7536,7 +7536,6 @@ "xpack.infra.logs.logEntryCategories.categoryQUalityWarningCalloutTitle": "质量警告", "xpack.infra.logs.logEntryCategories.countColumnTitle": "消息计数", "xpack.infra.logs.logEntryCategories.datasetColumnTitle": "数据集", - "xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder": "按数据集筛选", "xpack.infra.logs.logEntryCategories.jobStatusLoadingMessage": "正在检查归类作业的状态......", "xpack.infra.logs.logEntryCategories.loadDataErrorTitle": "无法加载类别数据", "xpack.infra.logs.logEntryCategories.manyCategoriesWarningReasonDescription": "每个分析文档的类别比率非常高,达到 {categoriesDocumentRatio, number }。",