From 533841c4afa612236e88c047f64ab075a5473f62 Mon Sep 17 00:00:00 2001 From: Liza Katz Date: Wed, 3 Feb 2021 23:15:41 +0200 Subject: [PATCH] [Search Sessions] Save all sessions, with persisted flag (#89570) * [data.search] Add search session methods to search service contract * Fix types * Fix tests and switch to cancel * Update docs * Fix types/tests * Fix tests * Update status of SO before cancelling search requests * Add API integration test * Fix types * Update expiration route to use config defaultExpiration * Fix test * Update docs * New logic for extend * Remove declare module * Search Sessions: Unskip Flaky Functional Test * Review feedback * fix ts * Save all search sessions and then manage them based on their persisted state * Get default search session expiration from config * randomize sleep time * fix test * Remove test that is no longer valid * fix test * Make sure we poll, and dont persist, searches not in the context of a session * Added keepalive unit tests * fix ts * code review @lukasolson * ts * More tests, rename onScreenTimeout to completedTimeout * lint * lint * Delete async seaches * Support saved object pagination Fix get search status tests * better PersistedSearchSessionSavedObjectAttributes ts * test titles * Fix undefined bug * Remove runAt from monitoring task Increase testing trackingInterval (caused bug) * support workload histograms that take into account overdue tasks * Update touched when changing session status to complete \ error * removed test * Updated management test data * Rename configs * delete tap first add comments * Use DataRequestHandlerContext in maps * ts * Fixed ts Co-authored-by: Lukas Olson Co-authored-by: Timothy Sullivan Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Anton Dosov Co-authored-by: Gidi Meir Morris --- src/plugins/data/common/utils/index.ts | 1 - .../data/common/utils/tap_first.test.ts | 19 - src/plugins/data/common/utils/tap_first.ts | 20 - .../data/server/search/search_service.test.ts | 144 ++++ .../data/server/search/search_service.ts | 7 +- .../data/server/search/session/types.ts | 2 +- x-pack/plugins/data_enhanced/common/index.ts | 1 + .../common/search/session/types.ts | 22 +- x-pack/plugins/data_enhanced/config.ts | 36 +- .../public/search/sessions_mgmt/lib/api.ts | 13 +- .../public/search/sessions_mgmt/types.ts | 14 +- x-pack/plugins/data_enhanced/server/plugin.ts | 7 +- .../data_enhanced/server/routes/session.ts | 6 +- .../server/saved_objects/search_session.ts | 9 +- .../server/search/eql_search_strategy.test.ts | 2 - .../server/search/eql_search_strategy.ts | 7 +- .../server/search/es_search_strategy.test.ts | 215 ++++-- .../server/search/es_search_strategy.ts | 16 +- .../server/search/request_utils.ts | 23 +- .../session/check_running_sessions.test.ts | 618 ++++++++++++++---- .../search/session/check_running_sessions.ts | 249 +++++-- .../search/session/get_search_status.test.ts | 30 +- .../search/session/get_search_status.ts | 49 +- .../server/search/session/monitoring_task.ts | 15 +- .../search/session/session_service.test.ts | 547 +++++++--------- .../server/search/session/session_service.ts | 288 +++----- .../server/search/session/types.ts | 4 + .../monitoring/workload_statistics.test.ts | 119 ++++ .../server/monitoring/workload_statistics.ts | 13 +- .../api_integration/apis/search/session.ts | 142 +++- x-pack/test/api_integration/config.ts | 2 + .../data/search_sessions/data.json.gz | Bin 1956 -> 1976 bytes .../data/search_sessions/mappings.json | 6 + .../async_search/send_to_background.ts | 1 + .../send_to_background_relative_time.ts | 31 +- 35 files changed, 1795 insertions(+), 883 deletions(-) delete mode 100644 src/plugins/data/common/utils/tap_first.test.ts delete mode 100644 src/plugins/data/common/utils/tap_first.ts diff --git a/src/plugins/data/common/utils/index.ts b/src/plugins/data/common/utils/index.ts index 8e17464f35172..f79666f669142 100644 --- a/src/plugins/data/common/utils/index.ts +++ b/src/plugins/data/common/utils/index.ts @@ -8,4 +8,3 @@ /** @internal */ export { shortenDottedString } from './shorten_dotted_string'; -export { tapFirst } from './tap_first'; diff --git a/src/plugins/data/common/utils/tap_first.test.ts b/src/plugins/data/common/utils/tap_first.test.ts deleted file mode 100644 index 5535a27df97db..0000000000000 --- a/src/plugins/data/common/utils/tap_first.test.ts +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * and the Server Side Public License, v 1; you may not use this file except in - * compliance with, at your election, the Elastic License or the Server Side - * Public License, v 1. - */ - -import { of } from 'rxjs'; -import { tapFirst } from './tap_first'; - -describe('tapFirst', () => { - it('should tap the first and only the first', () => { - const fn = jest.fn(); - of(1, 2, 3).pipe(tapFirst(fn)).subscribe(); - expect(fn).toBeCalledTimes(1); - expect(fn).lastCalledWith(1); - }); -}); diff --git a/src/plugins/data/common/utils/tap_first.ts b/src/plugins/data/common/utils/tap_first.ts deleted file mode 100644 index d5a9fe19fdbbf..0000000000000 --- a/src/plugins/data/common/utils/tap_first.ts +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * and the Server Side Public License, v 1; you may not use this file except in - * compliance with, at your election, the Elastic License or the Server Side - * Public License, v 1. - */ - -import { pipe } from 'rxjs'; -import { tap } from 'rxjs/operators'; - -export function tapFirst(next: (x: T) => void) { - let isFirst = true; - return pipe( - tap((x: T) => { - if (isFirst) next(x); - isFirst = false; - }) - ); -} diff --git a/src/plugins/data/server/search/search_service.test.ts b/src/plugins/data/server/search/search_service.test.ts index 4b0a280c3c1ca..37b41516611e4 100644 --- a/src/plugins/data/server/search/search_service.test.ts +++ b/src/plugins/data/server/search/search_service.test.ts @@ -17,6 +17,18 @@ import { createIndexPatternsStartMock } from '../index_patterns/mocks'; import { SearchService, SearchServiceSetupDependencies } from './search_service'; import { bfetchPluginMock } from '../../../bfetch/server/mocks'; import { of } from 'rxjs'; +import { + IEsSearchRequest, + IEsSearchResponse, + IScopedSearchClient, + IScopedSearchSessionsClient, + ISearchSessionService, + ISearchStart, + ISearchStrategy, +} from '.'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { expressionsPluginMock } from '../../../expressions/public/mocks'; +import { createSearchSessionsClientMock } from './mocks'; describe('Search service', () => { let plugin: SearchService; @@ -70,4 +82,136 @@ describe('Search service', () => { expect(start).toHaveProperty('getSearchStrategy'); }); }); + + describe('asScopedProvider', () => { + let mockScopedClient: IScopedSearchClient; + let searcPluginStart: ISearchStart>; + let mockStrategy: jest.Mocked; + let mockSessionService: ISearchSessionService; + let mockSessionClient: jest.Mocked; + const sessionId = '1234'; + + beforeEach(() => { + mockStrategy = { search: jest.fn().mockReturnValue(of({})) }; + + mockSessionClient = createSearchSessionsClientMock(); + mockSessionService = { + asScopedProvider: () => (request: any) => mockSessionClient, + }; + + const pluginSetup = plugin.setup(mockCoreSetup, { + bfetch: bfetchPluginMock.createSetupContract(), + expressions: expressionsPluginMock.createSetupContract(), + }); + pluginSetup.registerSearchStrategy('es', mockStrategy); + pluginSetup.__enhance({ + defaultStrategy: 'es', + sessionService: mockSessionService, + }); + + searcPluginStart = plugin.start(mockCoreStart, { + fieldFormats: createFieldFormatsStartMock(), + indexPatterns: createIndexPatternsStartMock(), + }); + + const r: any = {}; + + mockScopedClient = searcPluginStart.asScoped(r); + }); + + describe('search', () => { + it('searches using the original request if not restoring, trackId is not called if there is no id in the response', async () => { + const searchRequest = { params: {} }; + const options = { sessionId, isStored: false, isRestore: false }; + mockSessionClient.trackId = jest.fn(); + + mockStrategy.search.mockReturnValue( + of({ + rawResponse: {} as any, + }) + ); + + await mockScopedClient.search(searchRequest, options).toPromise(); + + const [request, callOptions] = mockStrategy.search.mock.calls[0]; + + expect(callOptions).toBe(options); + expect(request).toBe(searchRequest); + expect(mockSessionClient.trackId).not.toBeCalled(); + }); + + it('searches using the original request if `id` is provided', async () => { + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + const searchRequest = { id: searchId, params: {} }; + const options = { sessionId, isStored: true, isRestore: true }; + + await mockScopedClient.search(searchRequest, options).toPromise(); + + const [request, callOptions] = mockStrategy.search.mock.calls[0]; + expect(callOptions).toBe(options); + expect(request).toBe(searchRequest); + }); + + it('searches by looking up an `id` if restoring and `id` is not provided', async () => { + const searchRequest = { params: {} }; + const options = { sessionId, isStored: true, isRestore: true }; + + mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id'); + + await mockScopedClient.search(searchRequest, options).toPromise(); + + const [request, callOptions] = mockStrategy.search.mock.calls[0]; + expect(callOptions).toBe(options); + expect(request).toStrictEqual({ ...searchRequest, id: 'my_id' }); + }); + + it('calls `trackId` for every response, if the response contains an `id` and not restoring', async () => { + const searchRequest = { params: {} }; + const options = { sessionId, isStored: false, isRestore: false }; + mockSessionClient.trackId = jest.fn(); + + mockStrategy.search.mockReturnValue( + of( + { + id: 'my_id', + rawResponse: {} as any, + }, + { + id: 'my_id', + rawResponse: {} as any, + } + ) + ); + + await mockScopedClient.search(searchRequest, options).toPromise(); + + expect(mockSessionClient.trackId).toBeCalledTimes(2); + + expect(mockSessionClient.trackId.mock.calls[0]).toEqual([searchRequest, 'my_id', options]); + expect(mockSessionClient.trackId.mock.calls[1]).toEqual([searchRequest, 'my_id', options]); + }); + + it('does not call `trackId` if restoring', async () => { + const searchRequest = { params: {} }; + const options = { sessionId, isStored: true, isRestore: true }; + mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id'); + mockSessionClient.trackId = jest.fn(); + + await mockScopedClient.search(searchRequest, options).toPromise(); + + expect(mockSessionClient.trackId).not.toBeCalled(); + }); + + it('does not call `trackId` if no session id provided', async () => { + const searchRequest = { params: {} }; + const options = {}; + mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id'); + mockSessionClient.trackId = jest.fn(); + + await mockScopedClient.search(searchRequest, options).toPromise(); + + expect(mockSessionClient.trackId).not.toBeCalled(); + }); + }); + }); }); diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 34aefe33e4402..24a2eff68482f 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -19,7 +19,7 @@ import { SharedGlobalConfig, StartServicesAccessor, } from 'src/core/server'; -import { first, switchMap } from 'rxjs/operators'; +import { first, switchMap, tap } from 'rxjs/operators'; import { BfetchServerSetup } from 'src/plugins/bfetch/server'; import { ExpressionsServerSetup } from 'src/plugins/expressions/server'; import type { @@ -65,7 +65,6 @@ import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn'; import { ConfigSchema } from '../../config'; import { ISearchSessionService, SearchSessionService } from './session'; import { KbnServerError } from '../../../kibana_utils/server'; -import { tapFirst } from '../../common'; import { registerBsearchRoute } from './routes/bsearch'; type StrategyMap = Record>; @@ -274,8 +273,8 @@ export class SearchService implements Plugin { return from(getSearchRequest()).pipe( switchMap((searchRequest) => strategy.search(searchRequest, options, deps)), - tapFirst((response) => { - if (request.id || !options.sessionId || !response.id || options.isRestore) return; + tap((response) => { + if (!options.sessionId || !response.id || options.isRestore) return; deps.searchSessionsClient.trackId(request, response.id, options); }) ); diff --git a/src/plugins/data/server/search/session/types.ts b/src/plugins/data/server/search/session/types.ts index 3c074955a108e..d3220c8f7fbca 100644 --- a/src/plugins/data/server/search/session/types.ts +++ b/src/plugins/data/server/search/session/types.ts @@ -24,7 +24,7 @@ export interface IScopedSearchSessionsClient { options: ISearchOptions ) => Promise; getSearchIdMapping: (sessionId: string) => Promise>; - save: (sessionId: string, attributes: Partial) => Promise>; + save: (sessionId: string, attributes: Partial) => Promise | undefined>; get: (sessionId: string) => Promise>; find: (options: Omit) => Promise>; update: (sessionId: string, attributes: Partial) => Promise>; diff --git a/x-pack/plugins/data_enhanced/common/index.ts b/x-pack/plugins/data_enhanced/common/index.ts index 669c33230a34c..8c500ef21ffcf 100644 --- a/x-pack/plugins/data_enhanced/common/index.ts +++ b/x-pack/plugins/data_enhanced/common/index.ts @@ -5,6 +5,7 @@ */ export { + SEARCH_SESSION_TYPE, ENHANCED_ES_SEARCH_STRATEGY, EQL_SEARCH_STRATEGY, EqlRequestParams, diff --git a/x-pack/plugins/data_enhanced/common/search/session/types.ts b/x-pack/plugins/data_enhanced/common/search/session/types.ts index 9eefdf43aa245..6d07f4b731fae 100644 --- a/x-pack/plugins/data_enhanced/common/search/session/types.ts +++ b/x-pack/plugins/data_enhanced/common/search/session/types.ts @@ -6,19 +6,25 @@ import { SearchSessionStatus } from './'; +export const SEARCH_SESSION_TYPE = 'search-session'; export interface SearchSessionSavedObjectAttributes { + sessionId: string; /** * User-facing session name to be displayed in session management */ - name: string; + name?: string; /** * App that created the session. e.g 'discover' */ - appId: string; + appId?: string; /** * Creation time of the session */ created: string; + /** + * Last touch time of the session + */ + touched: string; /** * Expiration time of the session. Expiration itself is managed by Elasticsearch. */ @@ -30,22 +36,28 @@ export interface SearchSessionSavedObjectAttributes { /** * urlGeneratorId */ - urlGeneratorId: string; + urlGeneratorId?: string; /** * The application state that was used to create the session. * Should be used, for example, to re-load an expired search session. */ - initialState: Record; + initialState?: Record; /** * Application state that should be used to restore the session. * For example, relative dates are conveted to absolute ones. */ - restoreState: Record; + restoreState?: Record; /** * Mapping of search request hashes to their corresponsing info (async search id, etc.) */ idMapping: Record; + + /** + * This value is true if the session was actively stored by the user. If it is false, the session may be purged by the system. + */ + persisted: boolean; } + export interface SearchSessionRequestInfo { /** * ID of the async search request diff --git a/x-pack/plugins/data_enhanced/config.ts b/x-pack/plugins/data_enhanced/config.ts index 981c398019832..3c2c2084b2e2c 100644 --- a/x-pack/plugins/data_enhanced/config.ts +++ b/x-pack/plugins/data_enhanced/config.ts @@ -9,15 +9,49 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const configSchema = schema.object({ search: schema.object({ sessions: schema.object({ + /** + * Turns the feature on \ off (incl. removing indicator and management screens) + */ enabled: schema.boolean({ defaultValue: false }), + /** + * pageSize controls how many search session objects we load at once while monitoring + * session completion + */ pageSize: schema.number({ defaultValue: 10000 }), + /** + * trackingInterval controls how often we track search session objects progress + */ trackingInterval: schema.duration({ defaultValue: '10s' }), - inMemTimeout: schema.duration({ defaultValue: '1m' }), + /** + * notTouchedTimeout controls how long do we store unpersisted search session results, + * after the last search in the session has completed + */ + notTouchedTimeout: schema.duration({ defaultValue: '5m' }), + /** + * notTouchedInProgressTimeout controls how long do allow a search session to run after + * a user has navigated away without persisting + */ + notTouchedInProgressTimeout: schema.duration({ defaultValue: '1m' }), + /** + * maxUpdateRetries controls how many retries we perform while attempting to save a search session + */ maxUpdateRetries: schema.number({ defaultValue: 3 }), + /** + * defaultExpiration controls how long search sessions are valid for, until they are expired. + */ defaultExpiration: schema.duration({ defaultValue: '7d' }), management: schema.object({ + /** + * maxSessions controls how many saved search sessions we display per page on the management screen. + */ maxSessions: schema.number({ defaultValue: 10000 }), + /** + * refreshInterval controls how often we refresh the management screen. + */ refreshInterval: schema.duration({ defaultValue: '10s' }), + /** + * refreshTimeout controls how often we refresh the management screen. + */ refreshTimeout: schema.duration({ defaultValue: '1m' }), expiresSoonWarning: schema.duration({ defaultValue: '1d' }), }), diff --git a/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/lib/api.ts b/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/lib/api.ts index c6a3d088b3cda..25c06d1d2e278 100644 --- a/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/lib/api.ts +++ b/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/lib/api.ts @@ -10,12 +10,11 @@ import moment from 'moment'; import { from, race, timer } from 'rxjs'; import { mapTo, tap } from 'rxjs/operators'; import type { SharePluginStart } from 'src/plugins/share/public'; -import { SessionsConfigSchema } from '../'; -import type { ISessionsClient } from '../../../../../../../src/plugins/data/public'; -import type { SearchSessionSavedObjectAttributes } from '../../../../common'; +import { ISessionsClient } from '../../../../../../../src/plugins/data/public'; import { SearchSessionStatus } from '../../../../common/search'; import { ACTION } from '../components/actions'; -import { UISession } from '../types'; +import { PersistedSearchSessionSavedObjectAttributes, UISession } from '../types'; +import { SessionsConfigSchema } from '..'; type UrlGeneratorsStart = SharePluginStart['urlGenerators']; @@ -48,7 +47,7 @@ async function getUrlFromState( // Helper: factory for a function to map server objects to UI objects const mapToUISession = (urls: UrlGeneratorsStart, config: SessionsConfigSchema) => async ( - savedObject: SavedObject + savedObject: SavedObject ): Promise => { const { name, @@ -110,6 +109,8 @@ export class SearchSessionsMgmtAPI { perPage: mgmtConfig.maxSessions, sortField: 'created', sortOrder: 'asc', + searchFields: ['persisted'], + search: 'true', }) ); const timeout$ = timer(refreshTimeout.asMilliseconds()).pipe( @@ -129,7 +130,7 @@ export class SearchSessionsMgmtAPI { const result = await race(fetch$, timeout$).toPromise(); if (result && result.saved_objects) { const savedObjects = result.saved_objects as Array< - SavedObject + SavedObject >; return await Promise.all(savedObjects.map(mapToUISession(this.deps.urls, this.config))); } diff --git a/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/types.ts b/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/types.ts index 78b91f7ca8ac2..3b0159a1e8faa 100644 --- a/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/types.ts +++ b/x-pack/plugins/data_enhanced/public/search/sessions_mgmt/types.ts @@ -4,11 +4,23 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SearchSessionStatus } from '../../../common'; +import { SearchSessionSavedObjectAttributes, SearchSessionStatus } from '../../../common'; import { ACTION } from './components/actions'; export const DATE_STRING_FORMAT = 'D MMM, YYYY, HH:mm:ss'; +/** + * Some properties are optional for a non-persisted Search Session. + * This interface makes them mandatory, because management only shows persisted search sessions. + */ +export type PersistedSearchSessionSavedObjectAttributes = SearchSessionSavedObjectAttributes & + Required< + Pick< + SearchSessionSavedObjectAttributes, + 'name' | 'appId' | 'urlGeneratorId' | 'initialState' | 'restoreState' + > + >; + export interface UISession { id: string; name: string; diff --git a/x-pack/plugins/data_enhanced/server/plugin.ts b/x-pack/plugins/data_enhanced/server/plugin.ts index cff0ee3efd738..834f1669e2d7e 100644 --- a/x-pack/plugins/data_enhanced/server/plugin.ts +++ b/x-pack/plugins/data_enhanced/server/plugin.ts @@ -5,6 +5,7 @@ */ import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server'; +import { Observable } from 'rxjs'; import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server'; import { PluginSetup as DataPluginSetup, @@ -22,6 +23,7 @@ import { } from './search'; import { getUiSettings } from './ui_settings'; import type { DataEnhancedRequestHandlerContext } from './type'; +import { ConfigSchema } from '../config'; interface SetupDependencies { data: DataPluginSetup; @@ -37,9 +39,11 @@ export class EnhancedDataServerPlugin implements Plugin { private readonly logger: Logger; private sessionService!: SearchSessionService; + private config$: Observable; - constructor(private initializerContext: PluginInitializerContext) { + constructor(private initializerContext: PluginInitializerContext) { this.logger = initializerContext.logger.get('data_enhanced'); + this.config$ = this.initializerContext.config.create(); } public setup(core: CoreSetup, deps: SetupDependencies) { @@ -51,6 +55,7 @@ export class EnhancedDataServerPlugin deps.data.search.registerSearchStrategy( ENHANCED_ES_SEARCH_STRATEGY, enhancedEsSearchStrategyProvider( + this.config$, this.initializerContext.config.legacy.globalConfig$, this.logger, usage diff --git a/x-pack/plugins/data_enhanced/server/routes/session.ts b/x-pack/plugins/data_enhanced/server/routes/session.ts index b39ffd41f33c8..4855021a54f89 100644 --- a/x-pack/plugins/data_enhanced/server/routes/session.ts +++ b/x-pack/plugins/data_enhanced/server/routes/session.ts @@ -91,11 +91,13 @@ export function registerSessionRoutes(router: DataEnhancedPluginRouter, logger: sortField: schema.maybe(schema.string()), sortOrder: schema.maybe(schema.string()), filter: schema.maybe(schema.string()), + searchFields: schema.maybe(schema.arrayOf(schema.string())), + search: schema.maybe(schema.string()), }), }, }, async (context, request, res) => { - const { page, perPage, sortField, sortOrder, filter } = request.body; + const { page, perPage, sortField, sortOrder, filter, searchFields, search } = request.body; try { const response = await context.search!.findSessions({ page, @@ -103,6 +105,8 @@ export function registerSessionRoutes(router: DataEnhancedPluginRouter, logger: sortField, sortOrder, filter, + searchFields, + search, }); return res.ok({ diff --git a/x-pack/plugins/data_enhanced/server/saved_objects/search_session.ts b/x-pack/plugins/data_enhanced/server/saved_objects/search_session.ts index 4e75ffaeec69a..16472199de4d9 100644 --- a/x-pack/plugins/data_enhanced/server/saved_objects/search_session.ts +++ b/x-pack/plugins/data_enhanced/server/saved_objects/search_session.ts @@ -5,8 +5,7 @@ */ import { SavedObjectsType } from 'kibana/server'; - -export const SEARCH_SESSION_TYPE = 'search-session'; +import { SEARCH_SESSION_TYPE } from '../../common'; export const searchSessionMapping: SavedObjectsType = { name: SEARCH_SESSION_TYPE, @@ -14,6 +13,9 @@ export const searchSessionMapping: SavedObjectsType = { hidden: true, mappings: { properties: { + persisted: { + type: 'boolean', + }, sessionId: { type: 'keyword', }, @@ -26,6 +28,9 @@ export const searchSessionMapping: SavedObjectsType = { expires: { type: 'date', }, + touched: { + type: 'date', + }, status: { type: 'keyword', }, diff --git a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.test.ts b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.test.ts index f2d7725954a26..1670b1116eedb 100644 --- a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.test.ts @@ -117,7 +117,6 @@ describe('EQL search strategy', () => { expect(request).toEqual( expect.objectContaining({ wait_for_completion_timeout: '100ms', - keep_alive: '1m', }) ); }); @@ -156,7 +155,6 @@ describe('EQL search strategy', () => { expect(request).toEqual( expect.objectContaining({ wait_for_completion_timeout: '5ms', - keep_alive: '1m', keep_on_completion: false, }) ); diff --git a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts index a0d4e9dcd19b9..65ce5bdf5255c 100644 --- a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts @@ -22,7 +22,8 @@ export const eqlSearchStrategyProvider = ( logger: Logger ): ISearchStrategy => { async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { - await esClient.asCurrentUser.asyncSearch.delete({ id }); + const client = esClient.asCurrentUser.eql; + await client.delete({ id }); } return { @@ -41,11 +42,11 @@ export const eqlSearchStrategyProvider = ( uiSettingsClient ); const params = id - ? getDefaultAsyncGetParams() + ? getDefaultAsyncGetParams(options) : { ...(await getIgnoreThrottled(uiSettingsClient)), ...defaultParams, - ...getDefaultAsyncGetParams(), + ...getDefaultAsyncGetParams(options), ...request.params, }; const promise = id diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts index b2ddd0310f8f5..98238f50fa059 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts @@ -7,6 +7,7 @@ import { enhancedEsSearchStrategyProvider } from './es_search_strategy'; import { BehaviorSubject } from 'rxjs'; import { SearchStrategyDependencies } from '../../../../../src/plugins/data/server/search'; +import moment from 'moment'; import { KbnServerError } from '../../../../../src/plugins/kibana_utils/server'; import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors'; import * as indexNotFoundException from '../../../../../src/plugins/data/common/search/test_data/index_not_found_exception.json'; @@ -60,7 +61,7 @@ describe('ES search strategy', () => { }, }, } as unknown) as SearchStrategyDependencies; - const mockConfig$ = new BehaviorSubject({ + const mockLegacyConfig$ = new BehaviorSubject({ elasticsearch: { shardTimeout: { asMilliseconds: () => { @@ -70,6 +71,14 @@ describe('ES search strategy', () => { }, }); + const mockConfig$ = new BehaviorSubject({ + search: { + sessions: { + defaultExpiration: moment.duration('1', 'm'), + }, + }, + }); + beforeEach(() => { mockApiCaller.mockClear(); mockGetCaller.mockClear(); @@ -78,76 +87,140 @@ describe('ES search strategy', () => { }); it('returns a strategy with `search and `cancel`', async () => { - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); expect(typeof esSearch.search).toBe('function'); }); describe('search', () => { - it('makes a POST request to async search with params when no ID is provided', async () => { - mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); + describe('no sessionId', () => { + it('makes a POST request with params when no ID provided', async () => { + mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); - const params = { index: 'logstash-*', body: { query: {} } }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); - await esSearch.search({ params }, {}, mockDeps).toPromise(); + await esSearch.search({ params }, {}, mockDeps).toPromise(); - expect(mockSubmitCaller).toBeCalled(); - const request = mockSubmitCaller.mock.calls[0][0]; - expect(request.index).toEqual(params.index); - expect(request.body).toEqual(params.body); - }); + expect(mockSubmitCaller).toBeCalled(); + const request = mockSubmitCaller.mock.calls[0][0]; + expect(request.index).toEqual(params.index); + expect(request.body).toEqual(params.body); + expect(request).toHaveProperty('keep_alive', '1m'); + }); - it('makes a GET request to async search with ID when ID is provided', async () => { - mockGetCaller.mockResolvedValueOnce(mockAsyncResponse); + it('makes a GET request to async search with ID', async () => { + mockGetCaller.mockResolvedValueOnce(mockAsyncResponse); - const params = { index: 'logstash-*', body: { query: {} } }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); - await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise(); + await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise(); - expect(mockGetCaller).toBeCalled(); - const request = mockGetCaller.mock.calls[0][0]; - expect(request.id).toEqual('foo'); - expect(request).toHaveProperty('wait_for_completion_timeout'); - expect(request).toHaveProperty('keep_alive'); - }); + expect(mockGetCaller).toBeCalled(); + const request = mockGetCaller.mock.calls[0][0]; + expect(request.id).toEqual('foo'); + expect(request).toHaveProperty('wait_for_completion_timeout'); + expect(request).toHaveProperty('keep_alive', '1m'); + }); + + it('sets wait_for_completion_timeout and keep_alive in the request', async () => { + mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); + + const params = { index: 'foo-*', body: {} }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); + + await esSearch.search({ params }, {}, mockDeps).toPromise(); + + expect(mockSubmitCaller).toBeCalled(); + const request = mockSubmitCaller.mock.calls[0][0]; + expect(request).toHaveProperty('wait_for_completion_timeout'); + expect(request).toHaveProperty('keep_alive'); + }); - it('calls the rollup API if the index is a rollup type', async () => { - mockApiCaller.mockResolvedValueOnce(mockRollupResponse); - - const params = { index: 'foo-程', body: {} }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); - - await esSearch - .search( - { - indexType: 'rollup', - params, - }, - {}, - mockDeps - ) - .toPromise(); - - expect(mockApiCaller).toBeCalled(); - const { method, path } = mockApiCaller.mock.calls[0][0]; - expect(method).toBe('POST'); - expect(path).toBe('/foo-%E7%A8%8B/_rollup_search'); + it('calls the rollup API if the index is a rollup type', async () => { + mockApiCaller.mockResolvedValueOnce(mockRollupResponse); + + const params = { index: 'foo-程', body: {} }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); + + await esSearch + .search( + { + indexType: 'rollup', + params, + }, + {}, + mockDeps + ) + .toPromise(); + + expect(mockApiCaller).toBeCalled(); + const { method, path } = mockApiCaller.mock.calls[0][0]; + expect(method).toBe('POST'); + expect(path).toBe('/foo-%E7%A8%8B/_rollup_search'); + }); }); - it('sets wait_for_completion_timeout and keep_alive in the request', async () => { - mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); + describe('with sessionId', () => { + it('makes a POST request with params (long keepalive)', async () => { + mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); - const params = { index: 'foo-*', body: {} }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); - await esSearch.search({ params }, {}, mockDeps).toPromise(); + await esSearch.search({ params }, { sessionId: '1' }, mockDeps).toPromise(); - expect(mockSubmitCaller).toBeCalled(); - const request = mockSubmitCaller.mock.calls[0][0]; - expect(request).toHaveProperty('wait_for_completion_timeout'); - expect(request).toHaveProperty('keep_alive'); + expect(mockSubmitCaller).toBeCalled(); + const request = mockSubmitCaller.mock.calls[0][0]; + expect(request.index).toEqual(params.index); + expect(request.body).toEqual(params.body); + + expect(request).toHaveProperty('keep_alive', '60000ms'); + }); + + it('makes a GET request to async search without keepalive', async () => { + mockGetCaller.mockResolvedValueOnce(mockAsyncResponse); + + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); + + await esSearch.search({ id: 'foo', params }, { sessionId: '1' }, mockDeps).toPromise(); + + expect(mockGetCaller).toBeCalled(); + const request = mockGetCaller.mock.calls[0][0]; + expect(request.id).toEqual('foo'); + expect(request).toHaveProperty('wait_for_completion_timeout'); + expect(request).not.toHaveProperty('keep_alive'); + }); }); it('throws normalized error if ResponseError is thrown', async () => { @@ -162,7 +235,11 @@ describe('ES search strategy', () => { mockSubmitCaller.mockRejectedValue(errResponse); const params = { index: 'logstash-*', body: { query: {} } }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); let err: KbnServerError | undefined; try { @@ -183,7 +260,11 @@ describe('ES search strategy', () => { mockSubmitCaller.mockRejectedValue(errResponse); const params = { index: 'logstash-*', body: { query: {} } }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); let err: KbnServerError | undefined; try { @@ -204,7 +285,11 @@ describe('ES search strategy', () => { mockDeleteCaller.mockResolvedValueOnce(200); const id = 'some_id'; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); await esSearch.cancel!(id, {}, mockDeps); @@ -224,7 +309,11 @@ describe('ES search strategy', () => { mockDeleteCaller.mockRejectedValue(errResponse); const id = 'some_id'; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); let err: KbnServerError | undefined; try { @@ -247,7 +336,11 @@ describe('ES search strategy', () => { const id = 'some_other_id'; const keepAlive = '1d'; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); await esSearch.extend!(id, keepAlive, {}, mockDeps); @@ -262,7 +355,11 @@ describe('ES search strategy', () => { const id = 'some_other_id'; const keepAlive = '1d'; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); + const esSearch = await enhancedEsSearchStrategyProvider( + mockConfig$, + mockLegacyConfig$, + mockLogger + ); let err: KbnServerError | undefined; try { diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts index dc1fa13d32e27..64b1e1a57b489 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts @@ -34,10 +34,12 @@ import { } from './request_utils'; import { toAsyncKibanaSearchResponse } from './response_utils'; import { AsyncSearchResponse } from './types'; +import { ConfigSchema } from '../../config'; import { getKbnServerError, KbnServerError } from '../../../../../src/plugins/kibana_utils/server'; export const enhancedEsSearchStrategyProvider = ( - config$: Observable, + config$: Observable, + legacyConfig$: Observable, logger: Logger, usage?: SearchUsage ): ISearchStrategy => { @@ -57,9 +59,13 @@ export const enhancedEsSearchStrategyProvider = ( const client = esClient.asCurrentUser.asyncSearch; const search = async () => { + const config = await config$.pipe(first()).toPromise(); const params = id - ? getDefaultAsyncGetParams() - : { ...(await getDefaultAsyncSubmitParams(uiSettingsClient, options)), ...request.params }; + ? getDefaultAsyncGetParams(options) + : { + ...(await getDefaultAsyncSubmitParams(uiSettingsClient, config, options)), + ...request.params, + }; const promise = id ? client.get({ ...params, id }) : client.submit(params); @@ -88,12 +94,12 @@ export const enhancedEsSearchStrategyProvider = ( options: ISearchOptions, { esClient, uiSettingsClient }: SearchStrategyDependencies ): Promise { - const config = await config$.pipe(first()).toPromise(); + const legacyConfig = await legacyConfig$.pipe(first()).toPromise(); const { body, index, ...params } = request.params!; const method = 'POST'; const path = encodeURI(`/${index}/_rollup_search`); const querystring = { - ...getShardTimeout(config), + ...getShardTimeout(legacyConfig), ...(await getIgnoreThrottled(uiSettingsClient)), ...(await getDefaultSearchParams(uiSettingsClient)), ...params, diff --git a/x-pack/plugins/data_enhanced/server/search/request_utils.ts b/x-pack/plugins/data_enhanced/server/search/request_utils.ts index f54ab2199c905..d9ef3ab3292c3 100644 --- a/x-pack/plugins/data_enhanced/server/search/request_utils.ts +++ b/x-pack/plugins/data_enhanced/server/search/request_utils.ts @@ -11,6 +11,7 @@ import { } from '@elastic/elasticsearch/api/requestParams'; import { ISearchOptions, UI_SETTINGS } from '../../../../../src/plugins/data/common'; import { getDefaultSearchParams } from '../../../../../src/plugins/data/server'; +import { ConfigSchema } from '../../config'; /** * @internal @@ -27,6 +28,7 @@ export async function getIgnoreThrottled( */ export async function getDefaultAsyncSubmitParams( uiSettingsClient: IUiSettingsClient, + config: ConfigSchema, options: ISearchOptions ): Promise< Pick< @@ -44,21 +46,30 @@ export async function getDefaultAsyncSubmitParams( return { batched_reduce_size: 64, keep_on_completion: !!options.sessionId, // Always return an ID, even if the request completes quickly - ...getDefaultAsyncGetParams(), + ...getDefaultAsyncGetParams(options), ...(await getIgnoreThrottled(uiSettingsClient)), ...(await getDefaultSearchParams(uiSettingsClient)), + ...(options.sessionId + ? { + keep_alive: `${config.search.sessions.defaultExpiration.asMilliseconds()}ms`, + } + : {}), }; } /** @internal */ -export function getDefaultAsyncGetParams(): Pick< - AsyncSearchGet, - 'keep_alive' | 'wait_for_completion_timeout' -> { +export function getDefaultAsyncGetParams( + options: ISearchOptions +): Pick { return { - keep_alive: '1m', // Extend the TTL for this search request by one minute wait_for_completion_timeout: '100ms', // Wait up to 100ms for the response to return + ...(options.sessionId + ? undefined + : { + keep_alive: '1m', + // We still need to do polling for searches not within the context of a search session + }), }; } diff --git a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts index 4334ab3bc2903..352edc4639631 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts @@ -5,187 +5,569 @@ */ import { checkRunningSessions } from './check_running_sessions'; -import { SearchSessionStatus, SearchSessionSavedObjectAttributes } from '../../../common'; +import { + SearchSessionStatus, + SearchSessionSavedObjectAttributes, + ENHANCED_ES_SEARCH_STRATEGY, + EQL_SEARCH_STRATEGY, +} from '../../../common'; import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks'; import type { SavedObjectsClientContract } from 'kibana/server'; -import { SearchStatus } from './types'; +import { SearchSessionsConfig, SearchStatus } from './types'; +import moment from 'moment'; describe('getSearchStatus', () => { let mockClient: any; let savedObjectsClient: jest.Mocked; + const config: SearchSessionsConfig = { + enabled: true, + pageSize: 5, + notTouchedInProgressTimeout: moment.duration(1, 'm'), + notTouchedTimeout: moment.duration(5, 'm'), + maxUpdateRetries: 3, + defaultExpiration: moment.duration(7, 'd'), + trackingInterval: moment.duration(10, 's'), + management: {} as any, + }; const mockLogger: any = { debug: jest.fn(), warn: jest.fn(), error: jest.fn(), }; + const emptySO = { + persisted: false, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(10, 's')), + idMapping: {}, + }; + beforeEach(() => { savedObjectsClient = savedObjectsClientMock.create(); mockClient = { asyncSearch: { status: jest.fn(), + delete: jest.fn(), + }, + eql: { + status: jest.fn(), + delete: jest.fn(), }, }; }); test('does nothing if there are no open sessions', async () => { - savedObjectsClient.bulkUpdate = jest.fn(); savedObjectsClient.find.mockResolvedValue({ saved_objects: [], total: 0, } as any); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); }); - test('does nothing if there are no searchIds in the saved object', async () => { - savedObjectsClient.bulkUpdate = jest.fn(); - savedObjectsClient.find.mockResolvedValue({ - saved_objects: [ + describe('pagination', () => { + test('fetches one page if not objects exist', async () => { + savedObjectsClient.find.mockResolvedValueOnce({ + saved_objects: [], + total: 0, + } as any); + + await checkRunningSessions( { - attributes: { - idMapping: {}, - }, + savedObjectsClient, + client: mockClient, + logger: mockLogger, }, - ], - total: 1, - } as any); + config + ); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + expect(savedObjectsClient.find).toHaveBeenCalledTimes(1); + }); - expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + test('fetches one page if less than page size object are returned', async () => { + savedObjectsClient.find.mockResolvedValueOnce({ + saved_objects: [emptySO, emptySO], + total: 5, + } as any); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.find).toHaveBeenCalledTimes(1); + }); + + test('fetches two pages if exactly page size objects are returned', async () => { + let i = 0; + savedObjectsClient.find.mockImplementation(() => { + return new Promise((resolve) => { + resolve({ + saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [], + total: 5, + page: i, + } as any); + }); + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.find).toHaveBeenCalledTimes(2); + + // validate that page number increases + const { page: page1 } = savedObjectsClient.find.mock.calls[0][0]; + const { page: page2 } = savedObjectsClient.find.mock.calls[1][0]; + expect(page1).toBe(1); + expect(page2).toBe(2); + }); + + test('fetches two pages if page size +1 objects are returned', async () => { + let i = 0; + savedObjectsClient.find.mockImplementation(() => { + return new Promise((resolve) => { + resolve({ + saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [emptySO], + total: 5, + page: i, + } as any); + }); + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.find).toHaveBeenCalledTimes(2); + }); }); - test('does nothing if the search is still running', async () => { - savedObjectsClient.bulkUpdate = jest.fn(); - const so = { - attributes: { - idMapping: { - 'search-hash': { - id: 'search-id', - strategy: 'cool', - status: SearchStatus.IN_PROGRESS, + describe('delete', () => { + test('doesnt delete a persisted session', async () => { + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: true, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(30, 'm')), + touched: moment().subtract(moment.duration(10, 'm')), + idMapping: {}, + }, }, + ], + total: 1, + } as any); + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, }, - }, - }; - savedObjectsClient.find.mockResolvedValue({ - saved_objects: [so], - total: 1, - } as any); + config + ); - mockClient.asyncSearch.status.mockResolvedValue({ - body: { - is_partial: true, - is_running: true, - }, + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); }); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + test('doesnt delete a non persisted, recently touched session', async () => { + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(10, 's')), + idMapping: {}, + }, + }, + ], + total: 1, + } as any); + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); - expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); - }); + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); + }); - test("doesn't re-check completed or errored searches", async () => { - savedObjectsClient.bulkUpdate = jest.fn(); - const so = { - attributes: { - idMapping: { - 'search-hash': { - id: 'search-id', - strategy: 'cool', - status: SearchStatus.COMPLETE, + test('doesnt delete a non persisted, completed session, within on screen time frame', async () => { + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.COMPLETE, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(1, 'm')), + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.COMPLETE, + }, + }, + }, }, - 'another-search-hash': { - id: 'search-id', - strategy: 'cool', - status: SearchStatus.ERROR, + ], + total: 1, + } as any); + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); + }); + + test('deletes a non persisted, abandoned session', async () => { + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(2, 'm')), + idMapping: { + 'map-key': { + strategy: ENHANCED_ES_SEARCH_STRATEGY, + id: 'async-id', + }, + }, + }, }, + ], + total: 1, + } as any); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, }, - }, - }; - savedObjectsClient.find.mockResolvedValue({ - saved_objects: [so], - total: 1, - } as any); + config + ); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).toBeCalled(); - expect(mockClient.asyncSearch.status).not.toBeCalled(); - }); + expect(mockClient.asyncSearch.delete).toBeCalled(); + + const { id } = mockClient.asyncSearch.delete.mock.calls[0][0]; + expect(id).toBe('async-id'); + }); - test('updates to complete if the search is done', async () => { - savedObjectsClient.bulkUpdate = jest.fn(); - const so = { - attributes: { - idMapping: { - 'search-hash': { - id: 'search-id', - strategy: 'cool', - status: SearchStatus.IN_PROGRESS, + test('deletes a completed, not persisted session', async () => { + mockClient.asyncSearch.delete = jest.fn().mockResolvedValue(true); + + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.COMPLETE, + created: moment().subtract(moment.duration(30, 'm')), + touched: moment().subtract(moment.duration(6, 'm')), + idMapping: { + 'map-key': { + strategy: ENHANCED_ES_SEARCH_STRATEGY, + id: 'async-id', + status: SearchStatus.COMPLETE, + }, + 'eql-map-key': { + strategy: EQL_SEARCH_STRATEGY, + id: 'eql-async-id', + status: SearchStatus.COMPLETE, + }, + }, + }, }, + ], + total: 1, + } as any); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, }, - }, - }; - savedObjectsClient.find.mockResolvedValue({ - saved_objects: [so], - total: 1, - } as any); + config + ); - mockClient.asyncSearch.status.mockResolvedValue({ - body: { - is_partial: false, - is_running: false, - completion_status: 200, - }, + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).toBeCalled(); + + expect(mockClient.asyncSearch.delete).toBeCalled(); + expect(mockClient.eql.delete).not.toBeCalled(); + + const { id } = mockClient.asyncSearch.delete.mock.calls[0][0]; + expect(id).toBe('async-id'); }); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + test('ignores errors thrown while deleting async searches', async () => { + mockClient.asyncSearch.delete = jest.fn().mockRejectedValueOnce(false); + + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.COMPLETE, + created: moment().subtract(moment.duration(30, 'm')), + touched: moment().subtract(moment.duration(6, 'm')), + idMapping: { + 'map-key': { + strategy: ENHANCED_ES_SEARCH_STRATEGY, + id: 'async-id', + status: SearchStatus.COMPLETE, + }, + }, + }, + }, + ], + total: 1, + } as any); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).toBeCalled(); - expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' }); - const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; - const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; - expect(updatedAttributes.status).toBe(SearchSessionStatus.COMPLETE); - expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE); - expect(updatedAttributes.idMapping['search-hash'].error).toBeUndefined(); + expect(mockClient.asyncSearch.delete).toBeCalled(); + + const { id } = mockClient.asyncSearch.delete.mock.calls[0][0]; + expect(id).toBe('async-id'); + }); }); - test('updates to error if the search is errored', async () => { - savedObjectsClient.bulkUpdate = jest.fn(); - const so = { - attributes: { - idMapping: { - 'search-hash': { - id: 'search-id', - strategy: 'cool', - status: SearchStatus.IN_PROGRESS, + describe('update', () => { + test('does nothing if the search is still running', async () => { + const so = { + id: '123', + attributes: { + persisted: false, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(10, 's')), + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, }, }, - }, - }; - savedObjectsClient.find.mockResolvedValue({ - saved_objects: [so], - total: 1, - } as any); + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); - mockClient.asyncSearch.status.mockResolvedValue({ - body: { - is_partial: false, - is_running: false, - completion_status: 500, - }, + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: true, + is_running: true, + }, + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); }); - await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); - const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + test("doesn't re-check completed or errored searches", async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + savedObjectsClient.delete = jest.fn(); + const so = { + id: '123', + attributes: { + status: SearchSessionStatus.ERROR, + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.COMPLETE, + }, + 'another-search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.ERROR, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); - const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; - expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR); - expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR); - expect(updatedAttributes.idMapping['search-hash'].error).toBe( - 'Search completed with a 500 status' - ); + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(mockClient.asyncSearch.status).not.toBeCalled(); + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + expect(savedObjectsClient.delete).not.toBeCalled(); + }); + + test('updates to complete if the search is done', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + status: SearchSessionStatus.IN_PROGRESS, + touched: '123', + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 200, + }, + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' }); + const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; + expect(updatedAttributes.status).toBe(SearchSessionStatus.COMPLETE); + expect(updatedAttributes.touched).not.toBe('123'); + expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE); + expect(updatedAttributes.idMapping['search-hash'].error).toBeUndefined(); + + expect(savedObjectsClient.delete).not.toBeCalled(); + }); + + test('updates to error if the search is errored', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 500, + }, + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + + const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; + expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR); + expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR); + expect(updatedAttributes.idMapping['search-hash'].error).toBe( + 'Search completed with a 500 status' + ); + }); }); }); diff --git a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts index 71274e15e284d..7258b0ac124e8 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts @@ -10,93 +10,198 @@ import { SavedObjectsFindResult, SavedObjectsClientContract, } from 'kibana/server'; +import moment from 'moment'; +import { EMPTY, from } from 'rxjs'; +import { expand, mergeMap } from 'rxjs/operators'; +import { nodeBuilder } from '../../../../../../src/plugins/data/common'; import { SearchSessionStatus, SearchSessionSavedObjectAttributes, SearchSessionRequestInfo, + SEARCH_SESSION_TYPE, + ENHANCED_ES_SEARCH_STRATEGY, } from '../../../common'; -import { SEARCH_SESSION_TYPE } from '../../saved_objects'; import { getSearchStatus } from './get_search_status'; import { getSessionStatus } from './get_session_status'; -import { SearchStatus } from './types'; +import { SearchSessionsConfig, SearchStatus } from './types'; -export async function checkRunningSessions( - savedObjectsClient: SavedObjectsClientContract, +export interface CheckRunningSessionsDeps { + savedObjectsClient: SavedObjectsClientContract; + client: ElasticsearchClient; + logger: Logger; +} + +function isSessionStale( + session: SavedObjectsFindResult, + config: SearchSessionsConfig, + logger: Logger +) { + const curTime = moment(); + // Delete if a running session wasn't polled for in the last notTouchedInProgressTimeout OR + // if a completed \ errored \ canceled session wasn't saved for within notTouchedTimeout + return ( + (session.attributes.status === SearchSessionStatus.IN_PROGRESS && + curTime.diff(moment(session.attributes.touched), 'ms') > + config.notTouchedInProgressTimeout.asMilliseconds()) || + (session.attributes.status !== SearchSessionStatus.IN_PROGRESS && + curTime.diff(moment(session.attributes.touched), 'ms') > + config.notTouchedTimeout.asMilliseconds()) + ); +} + +async function updateSessionStatus( + session: SavedObjectsFindResult, client: ElasticsearchClient, logger: Logger +) { + let sessionUpdated = false; + + // Check statuses of all running searches + await Promise.all( + Object.keys(session.attributes.idMapping).map(async (searchKey: string) => { + const updateSearchRequest = ( + currentStatus: Pick + ) => { + sessionUpdated = true; + session.attributes.idMapping[searchKey] = { + ...session.attributes.idMapping[searchKey], + ...currentStatus, + }; + }; + + const searchInfo = session.attributes.idMapping[searchKey]; + if (searchInfo.status === SearchStatus.IN_PROGRESS) { + try { + const currentStatus = await getSearchStatus(client, searchInfo.id); + + if (currentStatus.status !== searchInfo.status) { + logger.debug(`search ${searchInfo.id} | status changed to ${currentStatus.status}`); + updateSearchRequest(currentStatus); + } + } catch (e) { + logger.error(e); + updateSearchRequest({ + status: SearchStatus.ERROR, + error: e.message || e.meta.error?.caused_by?.reason, + }); + } + } + }) + ); + + // And only then derive the session's status + const sessionStatus = getSessionStatus(session.attributes); + if (sessionStatus !== session.attributes.status) { + session.attributes.status = sessionStatus; + session.attributes.touched = new Date().toISOString(); + sessionUpdated = true; + } + + return sessionUpdated; +} + +function getSavedSearchSessionsPage$( + { savedObjectsClient, logger }: CheckRunningSessionsDeps, + config: SearchSessionsConfig, + page: number +) { + logger.debug(`Fetching saved search sessions page ${page}`); + return from( + savedObjectsClient.find({ + page, + perPage: config.pageSize, + type: SEARCH_SESSION_TYPE, + namespaces: ['*'], + filter: nodeBuilder.or([ + nodeBuilder.and([ + nodeBuilder.is( + `${SEARCH_SESSION_TYPE}.attributes.status`, + SearchSessionStatus.IN_PROGRESS.toString() + ), + nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'), + ]), + nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'false'), + ]), + }) + ); +} + +function getAllSavedSearchSessions$(deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) { + return getSavedSearchSessionsPage$(deps, config, 1).pipe( + expand((result) => { + if (!result || !result.saved_objects || result.saved_objects.length < config.pageSize) + return EMPTY; + else { + return getSavedSearchSessionsPage$(deps, config, result.page + 1); + } + }) + ); +} + +export async function checkRunningSessions( + deps: CheckRunningSessionsDeps, + config: SearchSessionsConfig ): Promise { + const { logger, client, savedObjectsClient } = deps; try { - const runningSearchSessionsResponse = await savedObjectsClient.find( - { - type: SEARCH_SESSION_TYPE, - search: SearchSessionStatus.IN_PROGRESS.toString(), - searchFields: ['status'], - namespaces: ['*'], - } - ); - - if (!runningSearchSessionsResponse.total) return; - - logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`); - - const updatedSessions = new Array>(); - - let sessionUpdated = false; - - await Promise.all( - runningSearchSessionsResponse.saved_objects.map(async (session) => { - // Check statuses of all running searches - await Promise.all( - Object.keys(session.attributes.idMapping).map(async (searchKey: string) => { - const updateSearchRequest = ( - currentStatus: Pick - ) => { - sessionUpdated = true; - session.attributes.idMapping[searchKey] = { - ...session.attributes.idMapping[searchKey], - ...currentStatus, - }; - }; - - const searchInfo = session.attributes.idMapping[searchKey]; - if (searchInfo.status === SearchStatus.IN_PROGRESS) { - try { - const currentStatus = await getSearchStatus(client, searchInfo.id); - - if (currentStatus.status !== SearchStatus.IN_PROGRESS) { - updateSearchRequest(currentStatus); + await getAllSavedSearchSessions$(deps, config) + .pipe( + mergeMap(async (runningSearchSessionsResponse) => { + if (!runningSearchSessionsResponse.total) return; + + logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`); + + const updatedSessions = new Array< + SavedObjectsFindResult + >(); + + await Promise.all( + runningSearchSessionsResponse.saved_objects.map(async (session) => { + const updated = await updateSessionStatus(session, client, logger); + let deleted = false; + + if (!session.attributes.persisted) { + if (isSessionStale(session, config, logger)) { + deleted = true; + // delete saved object to free up memory + // TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session! + // Maybe we want to change state to deleted and cleanup later? + logger.debug(`Deleting stale session | ${session.id}`); + await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id); + + // Send a delete request for each async search to ES + Object.keys(session.attributes.idMapping).map(async (searchKey: string) => { + const searchInfo = session.attributes.idMapping[searchKey]; + if (searchInfo.strategy === ENHANCED_ES_SEARCH_STRATEGY) { + try { + await client.asyncSearch.delete({ id: searchInfo.id }); + } catch (e) { + logger.debug( + `Error ignored while deleting async_search ${searchInfo.id}: ${e.message}` + ); + } + } + }); } - } catch (e) { - logger.error(e); - updateSearchRequest({ - status: SearchStatus.ERROR, - error: e.message || e.meta.error?.caused_by?.reason, - }); } - } - }) - ); - - // And only then derive the session's status - const sessionStatus = getSessionStatus(session.attributes); - if (sessionStatus !== SearchSessionStatus.IN_PROGRESS) { - session.attributes.status = sessionStatus; - sessionUpdated = true; - } - if (sessionUpdated) { - updatedSessions.push(session); - } - }) - ); - - if (updatedSessions.length) { - // If there's an error, we'll try again in the next iteration, so there's no need to check the output. - const updatedResponse = await savedObjectsClient.bulkUpdate( - updatedSessions - ); - logger.debug(`Updated ${updatedResponse.saved_objects.length} background sessions`); - } + if (updated && !deleted) { + updatedSessions.push(session); + } + }) + ); + + // Do a bulk update + if (updatedSessions.length) { + // If there's an error, we'll try again in the next iteration, so there's no need to check the output. + const updatedResponse = await savedObjectsClient.bulkUpdate( + updatedSessions + ); + logger.debug(`Updated ${updatedResponse.saved_objects.length} search sessions`); + } + }) + ) + .toPromise(); } catch (err) { logger.error(err); } diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts index e66ce613b71d9..c4eef0b3ddbb3 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts @@ -17,7 +17,7 @@ describe('getSearchStatus', () => { }; }); - test('returns an error status if search is partial and not running', () => { + test('returns an error status if search is partial and not running', async () => { mockClient.asyncSearch.status.mockResolvedValue({ body: { is_partial: true, @@ -25,10 +25,11 @@ describe('getSearchStatus', () => { completion_status: 200, }, }); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.ERROR); }); - test('returns an error status if completion_status is an error', () => { + test('returns an error status if completion_status is an error', async () => { mockClient.asyncSearch.status.mockResolvedValue({ body: { is_partial: false, @@ -36,10 +37,11 @@ describe('getSearchStatus', () => { completion_status: 500, }, }); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.ERROR); }); - test('returns an error status if gets an ES error', () => { + test('returns an error status if gets an ES error', async () => { mockClient.asyncSearch.status.mockResolvedValue({ error: { root_cause: { @@ -47,15 +49,17 @@ describe('getSearchStatus', () => { }, }, }); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.ERROR); }); - test('returns an error status throws', () => { + test('returns an error status throws', async () => { mockClient.asyncSearch.status.mockRejectedValue(new Error('O_o')); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.ERROR); }); - test('returns a complete status', () => { + test('returns a complete status', async () => { mockClient.asyncSearch.status.mockResolvedValue({ body: { is_partial: false, @@ -63,10 +67,11 @@ describe('getSearchStatus', () => { completion_status: 200, }, }); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.COMPLETE); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.COMPLETE); }); - test('returns a running status otherwise', () => { + test('returns a running status otherwise', async () => { mockClient.asyncSearch.status.mockResolvedValue({ body: { is_partial: false, @@ -74,6 +79,7 @@ describe('getSearchStatus', () => { completion_status: undefined, }, }); - expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.IN_PROGRESS); + const res = await getSearchStatus(mockClient, '123'); + expect(res.status).toBe(SearchStatus.IN_PROGRESS); }); }); diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts index e2b5fc0157b37..3e93ae4e056c7 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts @@ -16,27 +16,40 @@ export async function getSearchStatus( asyncId: string ): Promise> { // TODO: Handle strategies other than the default one - const apiResponse: ApiResponse = await client.asyncSearch.status({ - id: asyncId, - }); - const response = apiResponse.body; - if ((response.is_partial && !response.is_running) || response.completion_status >= 400) { + try { + const apiResponse: ApiResponse = await client.asyncSearch.status({ + id: asyncId, + }); + const response = apiResponse.body; + if ((response.is_partial && !response.is_running) || response.completion_status >= 400) { + return { + status: SearchStatus.ERROR, + error: i18n.translate('xpack.data.search.statusError', { + defaultMessage: `Search completed with a {errorCode} status`, + values: { errorCode: response.completion_status }, + }), + }; + } else if (!response.is_partial && !response.is_running) { + return { + status: SearchStatus.COMPLETE, + error: undefined, + }; + } else { + return { + status: SearchStatus.IN_PROGRESS, + error: undefined, + }; + } + } catch (e) { return { status: SearchStatus.ERROR, - error: i18n.translate('xpack.data.search.statusError', { - defaultMessage: `Search completed with a {errorCode} status`, - values: { errorCode: response.completion_status }, + error: i18n.translate('xpack.data.search.statusThrow', { + defaultMessage: `Search status threw an error {message} ({errorCode}) status`, + values: { + message: e.message, + errorCode: e.statusCode || 500, + }, }), }; - } else if (!response.is_partial && !response.is_running) { - return { - status: SearchStatus.COMPLETE, - error: undefined, - }; - } else { - return { - status: SearchStatus.IN_PROGRESS, - error: undefined, - }; } } diff --git a/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts b/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts index 332e69b119bb6..d9f3cdb8debe7 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts @@ -14,10 +14,10 @@ import { } from '../../../../task_manager/server'; import { checkRunningSessions } from './check_running_sessions'; import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server'; -import { SEARCH_SESSION_TYPE } from '../../saved_objects'; import { ConfigSchema } from '../../../config'; +import { SEARCH_SESSION_TYPE } from '../../../common'; -export const SEARCH_SESSIONS_TASK_TYPE = 'bg_monitor'; +export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor'; export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`; interface SearchSessionTaskDeps { @@ -31,17 +31,20 @@ function searchSessionRunner(core: CoreSetup, { logger, config$ }: SearchSession return { async run() { const config = await config$.pipe(first()).toPromise(); + const sessionConfig = config.search.sessions; const [coreStart] = await core.getStartServices(); const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]); const internalSavedObjectsClient = new SavedObjectsClient(internalRepo); await checkRunningSessions( - internalSavedObjectsClient, - coreStart.elasticsearch.client.asInternalUser, - logger + { + savedObjectsClient: internalSavedObjectsClient, + client: coreStart.elasticsearch.client.asInternalUser, + logger, + }, + sessionConfig ); return { - runAt: new Date(Date.now() + config.search.sessions.trackingInterval.asMilliseconds()), state: {}, }; }, diff --git a/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts b/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts index 38661ff352ffe..3c8e0e1dc7dce 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts @@ -5,11 +5,14 @@ */ import { BehaviorSubject } from 'rxjs'; -import type { SavedObject, SavedObjectsClientContract } from 'kibana/server'; +import { + SavedObject, + SavedObjectsClientContract, + SavedObjectsErrorHelpers, +} from '../../../../../../src/core/server'; import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks'; -import { SearchSessionStatus } from '../../../common'; -import { SEARCH_SESSION_TYPE } from '../../saved_objects'; -import { SearchSessionService, SessionInfo } from './session_service'; +import { SearchSessionStatus, SEARCH_SESSION_TYPE } from '../../../common'; +import { SearchSessionService } from './session_service'; import { createRequestHash } from './utils'; import moment from 'moment'; import { coreMock } from 'src/core/server/mocks'; @@ -17,7 +20,6 @@ import { ConfigSchema } from '../../../config'; // @ts-ignore import { taskManagerMock } from '../../../../task_manager/server/mocks'; -const INMEM_TRACKING_INTERVAL = 10000; const MAX_UPDATE_RETRIES = 3; const flushPromises = () => new Promise((resolve) => setImmediate(resolve)); @@ -26,67 +28,7 @@ describe('SearchSessionService', () => { let savedObjectsClient: jest.Mocked; let service: SearchSessionService; - const MOCK_SESSION_ID = 'session-id-mock'; - const MOCK_ASYNC_ID = '123456'; const MOCK_STRATEGY = 'ese'; - const MOCK_KEY_HASH = '608de49a4600dbb5b173492759792e4a'; - - const createMockInternalSavedObjectClient = ( - findSpy?: jest.SpyInstance, - bulkUpdateSpy?: jest.SpyInstance - ) => { - Object.defineProperty(service, 'internalSavedObjectsClient', { - get: () => { - const find = - findSpy || - (() => { - return { - saved_objects: [ - { - attributes: { - sessionId: MOCK_SESSION_ID, - idMapping: { - 'another-key': { - id: 'another-async-id', - strategy: 'another-strategy', - }, - }, - }, - id: MOCK_SESSION_ID, - version: '1', - }, - ], - }; - }); - - const bulkUpdate = - bulkUpdateSpy || - (() => { - return { - saved_objects: [], - }; - }); - return { - find, - bulkUpdate, - }; - }, - }); - }; - - const createMockIdMapping = ( - mapValues: any[], - insertTime?: moment.Moment, - retryCount?: number - ): Map => { - const fakeMap = new Map(); - fakeMap.set(MOCK_SESSION_ID, { - ids: new Map(mapValues), - insertTime: insertTime || moment(), - retryCount: retryCount || 0, - }); - return fakeMap; - }; const sessionId = 'd7170a35-7e2c-48d6-8dec-9a056721b489'; const mockSavedObject: SavedObject = { @@ -108,8 +50,9 @@ describe('SearchSessionService', () => { sessions: { enabled: true, pageSize: 10000, - inMemTimeout: moment.duration(1, 'm'), - maxUpdateRetries: 3, + notTouchedInProgressTimeout: moment.duration(1, 'm'), + notTouchedTimeout: moment.duration(2, 'm'), + maxUpdateRetries: MAX_UPDATE_RETRIES, defaultExpiration: moment.duration(7, 'd'), trackingInterval: moment.duration(10, 's'), management: {} as any, @@ -124,7 +67,6 @@ describe('SearchSessionService', () => { service = new SearchSessionService(mockLogger, config$); const coreStart = coreMock.createStart(); const mockTaskManager = taskManagerMock.createStart(); - jest.useFakeTimers(); await flushPromises(); await service.start(coreStart, { taskManager: mockTaskManager, @@ -133,19 +75,6 @@ describe('SearchSessionService', () => { afterEach(() => { service.stop(); - jest.useRealTimers(); - }); - - it('search throws if `name` is not provided', () => { - expect(() => service.save({ savedObjectsClient }, sessionId, {})).rejects.toMatchInlineSnapshot( - `[Error: Name is required]` - ); - }); - - it('save throws if `name` is not provided', () => { - expect(() => service.save({ savedObjectsClient }, sessionId, {})).rejects.toMatchInlineSnapshot( - `[Error: Name is required]` - ); }); it('get calls saved objects client', async () => { @@ -180,7 +109,7 @@ describe('SearchSessionService', () => { }); }); - it('update calls saved objects client', async () => { + it('update calls saved objects client with added touch time', async () => { const mockUpdateSavedObject = { ...mockSavedObject, attributes: {}, @@ -191,77 +120,202 @@ describe('SearchSessionService', () => { const response = await service.update({ savedObjectsClient }, sessionId, attributes); expect(response).toBe(mockUpdateSavedObject); - expect(savedObjectsClient.update).toHaveBeenCalledWith( - SEARCH_SESSION_TYPE, - sessionId, - attributes - ); + + const [type, id, callAttributes] = savedObjectsClient.update.mock.calls[0]; + + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(id).toBe(sessionId); + expect(callAttributes).toHaveProperty('name', attributes.name); + expect(callAttributes).toHaveProperty('touched'); }); it('cancel updates object status', async () => { await service.cancel({ savedObjectsClient }, sessionId); + const [type, id, callAttributes] = savedObjectsClient.update.mock.calls[0]; - expect(savedObjectsClient.update).toHaveBeenCalledWith(SEARCH_SESSION_TYPE, sessionId, { - status: SearchSessionStatus.CANCELLED, - }); + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(id).toBe(sessionId); + expect(callAttributes).toHaveProperty('status', SearchSessionStatus.CANCELLED); + expect(callAttributes).toHaveProperty('touched'); }); describe('trackId', () => { - it('stores hash in memory when `isStored` is `false` for when `save` is called', async () => { + it('updates the saved object if search session already exists', async () => { const searchRequest = { params: {} }; const requestHash = createRequestHash(searchRequest.params); const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; - const isStored = false; - const name = 'my saved background search session'; - const appId = 'my_app_id'; - const urlGeneratorId = 'my_url_generator_id'; - const created = new Date().toISOString(); - const expires = new Date().toISOString(); - - const mockIdMapping = createMockIdMapping([]); - const setSpy = jest.fn(); - mockIdMapping.set = setSpy; - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, + + const mockUpdateSavedObject = { + ...mockSavedObject, + attributes: {}, + }; + savedObjectsClient.update.mockResolvedValue(mockUpdateSavedObject); + + await service.trackId({ savedObjectsClient }, searchRequest, searchId, { + sessionId, + strategy: MOCK_STRATEGY, + }); + + expect(savedObjectsClient.update).toHaveBeenCalled(); + expect(savedObjectsClient.create).not.toHaveBeenCalled(); + + const [type, id, callAttributes] = savedObjectsClient.update.mock.calls[0]; + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(id).toBe(sessionId); + expect(callAttributes).toHaveProperty('idMapping', { + [requestHash]: { + id: searchId, + status: SearchSessionStatus.IN_PROGRESS, + strategy: MOCK_STRATEGY, + }, + }); + expect(callAttributes).toHaveProperty('touched'); + }); + + it('retries updating the saved object if there was a ES conflict 409', async () => { + const searchRequest = { params: {} }; + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + + const mockUpdateSavedObject = { + ...mockSavedObject, + attributes: {}, + }; + + let counter = 0; + + savedObjectsClient.update.mockImplementation(() => { + return new Promise((resolve, reject) => { + if (counter === 0) { + counter++; + reject(SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId)); + } else { + resolve(mockUpdateSavedObject); + } + }); + }); + + await service.trackId({ savedObjectsClient }, searchRequest, searchId, { + sessionId, + strategy: MOCK_STRATEGY, + }); + + expect(savedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(savedObjectsClient.create).not.toHaveBeenCalled(); + }); + + it('retries updating the saved object if theres a ES conflict 409, but stops after MAX_RETRIES times', async () => { + const searchRequest = { params: {} }; + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + + savedObjectsClient.update.mockImplementation(() => { + return new Promise((resolve, reject) => { + reject(SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId)); + }); }); await service.trackId({ savedObjectsClient }, searchRequest, searchId, { sessionId, - isStored, strategy: MOCK_STRATEGY, }); - expect(savedObjectsClient.update).not.toHaveBeenCalled(); + // Track ID doesn't throw errors even in cases of failure! + expect(savedObjectsClient.update).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES); + expect(savedObjectsClient.create).not.toHaveBeenCalled(); + }); - await service.save({ savedObjectsClient }, sessionId, { - name, - created, - expires, - appId, - urlGeneratorId, + it('creates the saved object in non persisted state, if search session doesnt exists', async () => { + const searchRequest = { params: {} }; + const requestHash = createRequestHash(searchRequest.params); + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + + const mockCreatedSavedObject = { + ...mockSavedObject, + attributes: {}, + }; + + savedObjectsClient.update.mockRejectedValue( + SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId) + ); + savedObjectsClient.create.mockResolvedValue(mockCreatedSavedObject); + + await service.trackId({ savedObjectsClient }, searchRequest, searchId, { + sessionId, + strategy: MOCK_STRATEGY, }); - expect(savedObjectsClient.create).toHaveBeenCalledWith( - SEARCH_SESSION_TYPE, - { - name, - created, - expires, - initialState: {}, - restoreState: {}, + expect(savedObjectsClient.update).toHaveBeenCalled(); + expect(savedObjectsClient.create).toHaveBeenCalled(); + + const [type, callAttributes, options] = savedObjectsClient.create.mock.calls[0]; + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(options).toStrictEqual({ id: sessionId }); + expect(callAttributes).toHaveProperty('idMapping', { + [requestHash]: { + id: searchId, status: SearchSessionStatus.IN_PROGRESS, - idMapping: {}, - appId, - urlGeneratorId, - sessionId, + strategy: MOCK_STRATEGY, }, - { id: sessionId } + }); + expect(callAttributes).toHaveProperty('expires'); + expect(callAttributes).toHaveProperty('created'); + expect(callAttributes).toHaveProperty('touched'); + expect(callAttributes).toHaveProperty('sessionId', sessionId); + expect(callAttributes).toHaveProperty('persisted', false); + }); + + it('retries updating if update returned 404 and then update returned conflict 409 (first create race condition)', async () => { + const searchRequest = { params: {} }; + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + + const mockUpdateSavedObject = { + ...mockSavedObject, + attributes: {}, + }; + + let counter = 0; + + savedObjectsClient.update.mockImplementation(() => { + return new Promise((resolve, reject) => { + if (counter === 0) { + counter++; + reject(SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId)); + } else { + resolve(mockUpdateSavedObject); + } + }); + }); + + savedObjectsClient.create.mockRejectedValue( + SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId) ); - const [setSessionId, setParams] = setSpy.mock.calls[0]; - expect(setParams.ids.get(requestHash).id).toBe(searchId); - expect(setParams.ids.get(requestHash).strategy).toBe(MOCK_STRATEGY); - expect(setSessionId).toBe(sessionId); + await service.trackId({ savedObjectsClient }, searchRequest, searchId, { + sessionId, + strategy: MOCK_STRATEGY, + }); + + expect(savedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); + }); + + it('retries everything at most MAX_RETRIES times', async () => { + const searchRequest = { params: {} }; + const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0'; + + savedObjectsClient.update.mockRejectedValue( + SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId) + ); + savedObjectsClient.create.mockRejectedValue( + SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId) + ); + + await service.trackId({ savedObjectsClient }, searchRequest, searchId, { + sessionId, + strategy: MOCK_STRATEGY, + }); + + expect(savedObjectsClient.update).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES); }); }); @@ -361,194 +415,87 @@ describe('SearchSessionService', () => { }); }); - describe('Monitor', () => { - it('schedules the next iteration', async () => { - const findSpy = jest.fn().mockResolvedValue({ saved_objects: [] }); - createMockInternalSavedObjectClient(findSpy); - - const mockIdMapping = createMockIdMapping( - [[MOCK_KEY_HASH, { id: MOCK_ASYNC_ID, strategy: MOCK_STRATEGY }]], - moment() + describe('save', () => { + it('save throws if `name` is not provided', () => { + expect(service.save({ savedObjectsClient }, sessionId, {})).rejects.toMatchInlineSnapshot( + `[Error: Name is required]` ); - - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, - }); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - expect(findSpy).toHaveBeenCalledTimes(1); - await flushPromises(); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - expect(findSpy).toHaveBeenCalledTimes(2); }); - it('should delete expired IDs', async () => { - const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] }); - createMockInternalSavedObjectClient(findSpy); - - const mockIdMapping = createMockIdMapping( - [[MOCK_KEY_HASH, { id: MOCK_ASYNC_ID, strategy: MOCK_STRATEGY }]], - moment().subtract(2, 'm') - ); - - const deleteSpy = jest.spyOn(mockIdMapping, 'delete'); - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, - }); - - // Get setInterval to fire - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - - expect(findSpy).not.toHaveBeenCalled(); - expect(deleteSpy).toHaveBeenCalledTimes(1); + it('save throws if `appId` is not provided', () => { + expect( + service.save({ savedObjectsClient }, sessionId, { name: 'banana' }) + ).rejects.toMatchInlineSnapshot(`[Error: AppId is required]`); }); - it('should delete IDs that passed max retries', async () => { - const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] }); - createMockInternalSavedObjectClient(findSpy); - - const mockIdMapping = createMockIdMapping( - [[MOCK_KEY_HASH, { id: MOCK_ASYNC_ID, strategy: MOCK_STRATEGY }]], - moment(), - MAX_UPDATE_RETRIES - ); - - const deleteSpy = jest.spyOn(mockIdMapping, 'delete'); - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, - }); - - // Get setInterval to fire - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - - expect(findSpy).not.toHaveBeenCalled(); - expect(deleteSpy).toHaveBeenCalledTimes(1); + it('save throws if `generator id` is not provided', () => { + expect( + service.save({ savedObjectsClient }, sessionId, { name: 'banana', appId: 'nanana' }) + ).rejects.toMatchInlineSnapshot(`[Error: UrlGeneratorId is required]`); }); - it('should not fetch when no IDs are mapped', async () => { - const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] }); - createMockInternalSavedObjectClient(findSpy); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - expect(findSpy).not.toHaveBeenCalled(); - }); + it('saving updates an existing saved object and persists it', async () => { + const mockUpdateSavedObject = { + ...mockSavedObject, + attributes: {}, + }; + savedObjectsClient.update.mockResolvedValue(mockUpdateSavedObject); - it('should try to fetch saved objects if some ids are mapped', async () => { - const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]]); - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, + await service.save({ savedObjectsClient }, sessionId, { + name: 'banana', + appId: 'nanana', + urlGeneratorId: 'panama', }); - const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] }); - const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] }); - createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - expect(findSpy).toHaveBeenCalledTimes(1); - expect(bulkUpdateSpy).not.toHaveBeenCalled(); + expect(savedObjectsClient.update).toHaveBeenCalled(); + expect(savedObjectsClient.create).not.toHaveBeenCalled(); + + const [type, id, callAttributes] = savedObjectsClient.update.mock.calls[0]; + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(id).toBe(sessionId); + expect(callAttributes).not.toHaveProperty('idMapping'); + expect(callAttributes).toHaveProperty('touched'); + expect(callAttributes).toHaveProperty('persisted', true); + expect(callAttributes).toHaveProperty('name', 'banana'); + expect(callAttributes).toHaveProperty('appId', 'nanana'); + expect(callAttributes).toHaveProperty('urlGeneratorId', 'panama'); + expect(callAttributes).toHaveProperty('initialState', {}); + expect(callAttributes).toHaveProperty('restoreState', {}); }); - it('should update saved objects if they are found, and delete session on success', async () => { - const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]], undefined, 1); - const mockMapDeleteSpy = jest.fn(); - const mockSessionDeleteSpy = jest.fn(); - mockIdMapping.delete = mockMapDeleteSpy; - mockIdMapping.get(MOCK_SESSION_ID)!.ids.delete = mockSessionDeleteSpy; - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, - }); - - const findSpy = jest.fn().mockResolvedValueOnce({ - saved_objects: [ - { - id: MOCK_SESSION_ID, - attributes: { - idMapping: { - b: 'c', - }, - }, - }, - ], - }); - const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({ - saved_objects: [ - { - id: MOCK_SESSION_ID, - attributes: { - idMapping: { - b: 'c', - [MOCK_KEY_HASH]: { - id: MOCK_ASYNC_ID, - strategy: MOCK_STRATEGY, - }, - }, - }, - }, - ], - }); - createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - - // Release timers to call check after test actions are done. - jest.useRealTimers(); - await new Promise((r) => setTimeout(r, 15)); - - expect(findSpy).toHaveBeenCalledTimes(1); - expect(bulkUpdateSpy).toHaveBeenCalledTimes(1); - expect(mockSessionDeleteSpy).toHaveBeenCalledTimes(2); - expect(mockSessionDeleteSpy).toBeCalledWith('b'); - expect(mockSessionDeleteSpy).toBeCalledWith(MOCK_KEY_HASH); - expect(mockMapDeleteSpy).toBeCalledTimes(1); - }); + it('saving creates a new persisted saved object, if it did not exist', async () => { + const mockCreatedSavedObject = { + ...mockSavedObject, + attributes: {}, + }; - it('should update saved objects if they are found, and increase retryCount on error', async () => { - const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]]); - const mockMapDeleteSpy = jest.fn(); - const mockSessionDeleteSpy = jest.fn(); - mockIdMapping.delete = mockMapDeleteSpy; - mockIdMapping.get(MOCK_SESSION_ID)!.ids.delete = mockSessionDeleteSpy; - Object.defineProperty(service, 'sessionSearchMap', { - get: () => mockIdMapping, - }); + savedObjectsClient.update.mockRejectedValue( + SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId) + ); + savedObjectsClient.create.mockResolvedValue(mockCreatedSavedObject); - const findSpy = jest.fn().mockResolvedValueOnce({ - saved_objects: [ - { - id: MOCK_SESSION_ID, - attributes: { - idMapping: { - b: { - id: 'c', - strategy: MOCK_STRATEGY, - }, - }, - }, - }, - ], - }); - const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({ - saved_objects: [ - { - id: MOCK_SESSION_ID, - error: 'not ok', - }, - ], + await service.save({ savedObjectsClient }, sessionId, { + name: 'banana', + appId: 'nanana', + urlGeneratorId: 'panama', }); - createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy); - - jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL); - - // Release timers to call check after test actions are done. - jest.useRealTimers(); - await new Promise((r) => setTimeout(r, 15)); - expect(findSpy).toHaveBeenCalledTimes(1); - expect(bulkUpdateSpy).toHaveBeenCalledTimes(1); - expect(mockSessionDeleteSpy).not.toHaveBeenCalled(); - expect(mockMapDeleteSpy).not.toHaveBeenCalled(); - expect(mockIdMapping.get(MOCK_SESSION_ID)!.retryCount).toBe(1); + expect(savedObjectsClient.update).toHaveBeenCalledTimes(1); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); + + const [type, callAttributes, options] = savedObjectsClient.create.mock.calls[0]; + expect(type).toBe(SEARCH_SESSION_TYPE); + expect(options?.id).toBe(sessionId); + expect(callAttributes).toHaveProperty('idMapping', {}); + expect(callAttributes).toHaveProperty('touched'); + expect(callAttributes).toHaveProperty('expires'); + expect(callAttributes).toHaveProperty('created'); + expect(callAttributes).toHaveProperty('persisted', true); + expect(callAttributes).toHaveProperty('name', 'banana'); + expect(callAttributes).toHaveProperty('appId', 'nanana'); + expect(callAttributes).toHaveProperty('urlGeneratorId', 'panama'); + expect(callAttributes).toHaveProperty('initialState', {}); + expect(callAttributes).toHaveProperty('restoreState', {}); }); }); }); diff --git a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts index 03466c769d9be..8496a0513caeb 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts @@ -4,26 +4,19 @@ * you may not use this file except in compliance with the Elastic License. */ -import moment, { Moment } from 'moment'; import { Observable } from 'rxjs'; import { first } from 'rxjs/operators'; import { CoreSetup, CoreStart, KibanaRequest, + SavedObjectsClientContract, Logger, SavedObject, - SavedObjectsBulkUpdateObject, - SavedObjectsClient, - SavedObjectsClientContract, SavedObjectsFindOptions, + SavedObjectsErrorHelpers, } from '../../../../../../src/core/server'; -import { - IKibanaSearchRequest, - ISearchOptions, - KueryNode, - nodeBuilder, -} from '../../../../../../src/plugins/data/common'; +import { IKibanaSearchRequest, ISearchOptions } from '../../../../../../src/plugins/data/common'; import { ISearchSessionService } from '../../../../../../src/plugins/data/server'; import { TaskManagerSetupContract, @@ -33,23 +26,16 @@ import { SearchSessionRequestInfo, SearchSessionSavedObjectAttributes, SearchSessionStatus, + SEARCH_SESSION_TYPE, } from '../../../common'; -import { SEARCH_SESSION_TYPE } from '../../saved_objects'; import { createRequestHash } from './utils'; import { ConfigSchema } from '../../../config'; import { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task'; -import { SearchStatus } from './types'; +import { SearchSessionsConfig, SearchStatus } from './types'; export interface SearchSessionDependencies { savedObjectsClient: SavedObjectsClientContract; } - -export interface SessionInfo { - insertTime: Moment; - retryCount: number; - ids: Map; -} - interface SetupDependencies { taskManager: TaskManagerSetupContract; } @@ -58,20 +44,11 @@ interface StartDependencies { taskManager: TaskManagerStartContract; } -type SearchSessionsConfig = ConfigSchema['search']['sessions']; - -/** - * @internal - */ +function sleep(ms: number) { + return new Promise((r) => setTimeout(r, ms)); +} export class SearchSessionService implements ISearchSessionService { - /** - * Map of sessionId to { [requestHash]: searchId } - * @private - */ - private sessionSearchMap = new Map(); - private internalSavedObjectsClient!: SavedObjectsClientContract; - private monitorTimer!: NodeJS.Timeout; private config!: SearchSessionsConfig; constructor( @@ -93,149 +70,68 @@ export class SearchSessionService return this.setupMonitoring(core, deps); } - public stop() { - this.sessionSearchMap.clear(); - clearTimeout(this.monitorTimer); - } + public stop() {} private setupMonitoring = async (core: CoreStart, deps: StartDependencies) => { if (this.config.enabled) { scheduleSearchSessionsTasks(deps.taskManager, this.logger, this.config.trackingInterval); - this.logger.debug(`setupMonitoring | Enabling monitoring`); - const internalRepo = core.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]); - this.internalSavedObjectsClient = new SavedObjectsClient(internalRepo); - this.monitorMappedIds(); } }; - /** - * Compiles a KQL Query to fetch sessions by ID. - * Done as a performance optimization workaround. - */ - private sessionIdsAsFilters(sessionIds: string[]): KueryNode { - return nodeBuilder.or( - sessionIds.map((id) => { - return nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.sessionId`, id); - }) - ); - } - - /** - * Gets all {@link SessionSavedObjectAttributes | Background Searches} that - * currently being tracked by the service. - * - * @remarks - * Uses `internalSavedObjectsClient` as this is called asynchronously, not within the - * context of a user's session. - */ - private async getAllMappedSavedObjects() { - const filter = this.sessionIdsAsFilters(Array.from(this.sessionSearchMap.keys())); - const res = await this.internalSavedObjectsClient.find({ - perPage: this.config.pageSize, // If there are more sessions in memory, they will be synced when some items are cleared out. - type: SEARCH_SESSION_TYPE, - filter, - namespaces: ['*'], - }); - this.logger.debug(`getAllMappedSavedObjects | Got ${res.saved_objects.length} items`); - return res.saved_objects; - } - - private clearSessions = async () => { - const curTime = moment(); - - this.sessionSearchMap.forEach((sessionInfo, sessionId) => { - if ( - moment.duration(curTime.diff(sessionInfo.insertTime)).asMilliseconds() > - this.config.inMemTimeout.asMilliseconds() - ) { - this.logger.debug(`clearSessions | Deleting expired session ${sessionId}`); - this.sessionSearchMap.delete(sessionId); - } else if (sessionInfo.retryCount >= this.config.maxUpdateRetries) { - this.logger.warn(`clearSessions | Deleting failed session ${sessionId}`); - this.sessionSearchMap.delete(sessionId); - } - }); - }; - - private async monitorMappedIds() { - this.monitorTimer = setTimeout(async () => { - try { - this.clearSessions(); - - if (!this.sessionSearchMap.size) return; - this.logger.debug(`monitorMappedIds | Map contains ${this.sessionSearchMap.size} items`); - - const savedSessions = await this.getAllMappedSavedObjects(); - const updatedSessions = await this.updateAllSavedObjects(savedSessions); + private updateOrCreate = async ( + deps: SearchSessionDependencies, + sessionId: string, + attributes: Partial, + retry: number = 1 + ): Promise | undefined> => { + const retryOnConflict = async (e: any) => { + this.logger.debug(`Conflict error | ${sessionId}`); + // Randomize sleep to spread updates out in case of conflicts + await sleep(100 + Math.random() * 50); + return await this.updateOrCreate(deps, sessionId, attributes, retry + 1); + }; - updatedSessions.forEach((updatedSavedObject) => { - const sessionInfo = this.sessionSearchMap.get(updatedSavedObject.id)!; - if (updatedSavedObject.error) { - this.logger.warn( - `monitorMappedIds | update error ${JSON.stringify(updatedSavedObject.error) || ''}` - ); - // Retry next time - sessionInfo.retryCount++; - } else if (updatedSavedObject.attributes.idMapping) { - // Delete the ids that we just saved, avoiding a potential new ids being lost. - Object.keys(updatedSavedObject.attributes.idMapping).forEach((key) => { - sessionInfo.ids.delete(key); - }); - // If the session object is empty, delete it as well - if (!sessionInfo.ids.entries.length) { - this.sessionSearchMap.delete(updatedSavedObject.id); - } else { - sessionInfo.retryCount = 0; - } + this.logger.debug(`updateOrCreate | ${sessionId} | ${retry}`); + try { + return (await this.update( + deps, + sessionId, + attributes + )) as SavedObject; + } catch (e) { + if (SavedObjectsErrorHelpers.isNotFoundError(e)) { + try { + this.logger.debug(`Object not found | ${sessionId}`); + return await this.create(deps, sessionId, attributes); + } catch (createError) { + if ( + SavedObjectsErrorHelpers.isConflictError(createError) && + retry < this.config.maxUpdateRetries + ) { + return await retryOnConflict(createError); + } else { + this.logger.error(createError); } - }); - } catch (e) { - this.logger.error(`monitorMappedIds | Error while updating sessions. ${e}`); - } finally { - this.monitorMappedIds(); + } + } else if ( + SavedObjectsErrorHelpers.isConflictError(e) && + retry < this.config.maxUpdateRetries + ) { + return await retryOnConflict(e); + } else { + this.logger.error(e); } - }, this.config.trackingInterval.asMilliseconds()); - } - - private async updateAllSavedObjects( - activeMappingObjects: Array> - ) { - if (!activeMappingObjects.length) return []; - - this.logger.debug(`updateAllSavedObjects | Updating ${activeMappingObjects.length} items`); - const updatedSessions: Array< - SavedObjectsBulkUpdateObject - > = activeMappingObjects - .filter((so) => !so.error) - .map((sessionSavedObject) => { - const sessionInfo = this.sessionSearchMap.get(sessionSavedObject.id); - const idMapping = sessionInfo ? Object.fromEntries(sessionInfo.ids.entries()) : {}; - sessionSavedObject.attributes.idMapping = { - ...sessionSavedObject.attributes.idMapping, - ...idMapping, - }; - return { - ...sessionSavedObject, - namespace: sessionSavedObject.namespaces?.[0], - }; - }); + } - const updateResults = await this.internalSavedObjectsClient.bulkUpdate( - updatedSessions - ); - return updateResults.saved_objects; - } + return undefined; + }; - // TODO: Generate the `userId` from the realm type/realm name/username public save = async ( - { savedObjectsClient }: SearchSessionDependencies, + deps: SearchSessionDependencies, sessionId: string, { name, appId, - created = new Date().toISOString(), - expires = new Date(Date.now() + this.config.defaultExpiration.asMilliseconds()).toISOString(), - status = SearchSessionStatus.IN_PROGRESS, urlGeneratorId, initialState = {}, restoreState = {}, @@ -245,27 +141,38 @@ export class SearchSessionService if (!appId) throw new Error('AppId is required'); if (!urlGeneratorId) throw new Error('UrlGeneratorId is required'); - this.logger.debug(`save | ${sessionId}`); - - const attributes = { + return this.updateOrCreate(deps, sessionId, { name, - created, - expires, - status, + appId, + urlGeneratorId, initialState, restoreState, - idMapping: {}, - urlGeneratorId, - appId, - sessionId, - }; - const session = await savedObjectsClient.create( + persisted: true, + }); + }; + + private create = ( + { savedObjectsClient }: SearchSessionDependencies, + sessionId: string, + attributes: Partial + ) => { + this.logger.debug(`create | ${sessionId}`); + return savedObjectsClient.create( SEARCH_SESSION_TYPE, - attributes, + { + sessionId, + status: SearchSessionStatus.IN_PROGRESS, + expires: new Date( + Date.now() + this.config.defaultExpiration.asMilliseconds() + ).toISOString(), + created: new Date().toISOString(), + touched: new Date().toISOString(), + idMapping: {}, + persisted: false, + ...attributes, + }, { id: sessionId } ); - - return session; }; // TODO: Throw an error if this session doesn't belong to this user @@ -298,7 +205,10 @@ export class SearchSessionService return savedObjectsClient.update( SEARCH_SESSION_TYPE, sessionId, - attributes + { + ...attributes, + touched: new Date().toISOString(), + } ); }; @@ -316,8 +226,7 @@ export class SearchSessionService }; /** - * Tracks the given search request/search ID in the saved session (if it exists). Otherwise, just - * store it in memory until a saved session exists. + * Tracks the given search request/search ID in the saved session. * @internal */ public trackId = async ( @@ -328,21 +237,20 @@ export class SearchSessionService ) => { if (!sessionId || !searchId) return; this.logger.debug(`trackId | ${sessionId} | ${searchId}`); - const requestHash = createRequestHash(searchRequest.params); - const searchInfo = { - id: searchId, - strategy: strategy!, - status: SearchStatus.IN_PROGRESS, - }; - // Update the in-memory mapping for this session for when the session is saved. - const map = this.sessionSearchMap.get(sessionId) ?? { - insertTime: moment(), - retryCount: 0, - ids: new Map(), - }; - map.ids.set(requestHash, searchInfo); - this.sessionSearchMap.set(sessionId, map); + let idMapping: Record = {}; + + if (searchRequest.params) { + const requestHash = createRequestHash(searchRequest.params); + const searchInfo = { + id: searchId, + strategy: strategy!, + status: SearchStatus.IN_PROGRESS, + }; + idMapping = { [requestHash]: searchInfo }; + } + + await this.updateOrCreate(deps, sessionId, { idMapping }); }; public async getSearchIdMapping(deps: SearchSessionDependencies, sessionId: string) { @@ -375,8 +283,10 @@ export class SearchSessionService const session = await this.get(deps, sessionId); const requestHash = createRequestHash(searchRequest.params); if (!session.attributes.idMapping.hasOwnProperty(requestHash)) { + this.logger.error(`getId | ${sessionId} | ${requestHash} not found`); throw new Error('No search ID in this session matching the given search request'); } + this.logger.debug(`getId | ${sessionId} | ${requestHash}`); return session.attributes.idMapping[requestHash].id; }; diff --git a/x-pack/plugins/data_enhanced/server/search/session/types.ts b/x-pack/plugins/data_enhanced/server/search/session/types.ts index c30e03f70d2dc..136c37942cb2e 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/types.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/types.ts @@ -4,8 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ +import { ConfigSchema } from '../../../config'; + export enum SearchStatus { IN_PROGRESS = 'in_progress', ERROR = 'error', COMPLETE = 'complete', } + +export type SearchSessionsConfig = ConfigSchema['search']['sessions']; diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index 21c9f429814ca..ed7c48d046dc2 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -775,6 +775,125 @@ describe('padBuckets', () => { }) ).toEqual([0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); }); + + test('supports histogram buckets that begin in the past when tasks are overdue', async () => { + expect( + padBuckets(20, 3000, { + key: '2021-02-02T10:08:32.161Z-2021-02-02T10:09:32.161Z', + from: 1612260512161, + from_as_string: '2021-02-02T10:08:32.161Z', + to: 1612260572161, + to_as_string: '2021-02-02T10:09:32.161Z', + doc_count: 2, + histogram: { + buckets: [ + { + key_as_string: '2021-02-02T10:08:30.000Z', + key: 1612260510000, + doc_count: 1, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [ + { + key: '2s', + doc_count: 1, + }, + ], + }, + }, + { + key_as_string: '2021-02-02T10:08:33.000Z', + key: 1612260513000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:36.000Z', + key: 1612260516000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:39.000Z', + key: 1612260519000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:42.000Z', + key: 1612260522000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:45.000Z', + key: 1612260525000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:48.000Z', + key: 1612260528000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:51.000Z', + key: 1612260531000, + doc_count: 0, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [], + }, + }, + { + key_as_string: '2021-02-02T10:08:54.000Z', + key: 1612260534000, + doc_count: 1, + interval: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [ + { + key: '60s', + doc_count: 1, + }, + ], + }, + }, + ], + }, + }).length + // we need to ensure overdue buckets don't cause us to over pad the timeline by adding additional + // buckets before and after the reported timeline + ).toEqual(20); + }); }); function setTaskTypeCount( diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index 8002ee44d01ff..8bd22bd88cf41 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -244,10 +244,19 @@ export function padBuckets( const firstBucket = histogram.buckets[0].key; const lastBucket = histogram.buckets[histogram.buckets.length - 1].key; - const bucketsToPadBeforeFirstBucket = calculateBucketsBetween(firstBucket, from, pollInterval); + // detect when the first bucket is before the `from` so that we can take that into + // account by begining the timeline earlier + // This can happen when you have overdue tasks and Elasticsearch returns their bucket + // as begining before the `from` + const firstBucketStartsInThePast = firstBucket - from < 0; + + const bucketsToPadBeforeFirstBucket = firstBucketStartsInThePast + ? [] + : calculateBucketsBetween(firstBucket, from, pollInterval); + const bucketsToPadAfterLast = calculateBucketsBetween( lastBucket + pollInterval, - to, + firstBucketStartsInThePast ? to - pollInterval : to, pollInterval ); diff --git a/x-pack/test/api_integration/apis/search/session.ts b/x-pack/test/api_integration/apis/search/session.ts index 96d601a00ff36..ad3504e756a09 100644 --- a/x-pack/test/api_integration/apis/search/session.ts +++ b/x-pack/test/api_integration/apis/search/session.ts @@ -13,6 +13,20 @@ export default function ({ getService }: FtrProviderContext) { describe('search session', () => { describe('session management', () => { + it('should fail to create a session with no name', async () => { + const sessionId = `my-session-${Math.random()}`; + await supertest + .post(`/internal/session`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + appId: 'discover', + expires: '123', + urlGeneratorId: 'discover', + }) + .expect(400); + }); + it('should create and get a session', async () => { const sessionId = `my-session-${Math.random()}`; await supertest @@ -59,7 +73,7 @@ export default function ({ getService }: FtrProviderContext) { expect(status).to.equal(SearchSessionStatus.CANCELLED); }); - it('should sync search ids into session', async () => { + it('should sync search ids into persisted session', async () => { const sessionId = `my-session-${Math.random()}`; // run search @@ -83,7 +97,7 @@ export default function ({ getService }: FtrProviderContext) { const { id: id1 } = searchRes1.body; - // create session + // persist session await supertest .post(`/internal/session`) .set('kbn-xsrf', 'foo') @@ -115,16 +129,16 @@ export default function ({ getService }: FtrProviderContext) { const { id: id2 } = searchRes2.body; - // wait 10 seconds for ids to be synced - // TODO: make the refresh interval dynamic, so we can speed it up! - await new Promise((resolve) => setTimeout(resolve, 10000)); - const resp = await supertest .get(`/internal/session/${sessionId}`) .set('kbn-xsrf', 'foo') .expect(200); - const { idMapping } = resp.body.attributes; + const { name, touched, created, persisted, idMapping } = resp.body.attributes; + expect(persisted).to.be(true); + expect(name).to.be('My Session'); + expect(touched).not.to.be(undefined); + expect(created).not.to.be(undefined); const idMappings = Object.values(idMapping).map((value: any) => value.id); expect(idMappings).to.contain(id1); @@ -164,5 +178,119 @@ export default function ({ getService }: FtrProviderContext) { }) .expect(404); }); + + it('should sync search ids into not persisted session', async () => { + const sessionId = `my-session-${Math.random()}`; + + // run search + const searchRes1 = await supertest + .post(`/internal/search/ese`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + params: { + body: { + query: { + term: { + agent: '1', + }, + }, + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id: id1 } = searchRes1.body; + + // run search + const searchRes2 = await supertest + .post(`/internal/search/ese`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + params: { + body: { + query: { + match_all: {}, + }, + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id: id2 } = searchRes2.body; + + const resp = await supertest + .get(`/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + const { appId, name, touched, created, persisted, idMapping } = resp.body.attributes; + expect(persisted).to.be(false); + expect(name).to.be(undefined); + expect(appId).to.be(undefined); + expect(touched).not.to.be(undefined); + expect(created).not.to.be(undefined); + + const idMappings = Object.values(idMapping).map((value: any) => value.id); + expect(idMappings).to.contain(id1); + expect(idMappings).to.contain(id2); + }); + + it('touched time updates when you poll on an search', async () => { + const sessionId = `my-session-${Math.random()}`; + + // run search + const searchRes1 = await supertest + .post(`/internal/search/ese`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + params: { + body: { + query: { + term: { + agent: '1', + }, + }, + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id: id1 } = searchRes1.body; + + // it might take the session a moment to be created + await new Promise((resolve) => setTimeout(resolve, 1000)); + + const getSessionFirstTime = await supertest + .get(`/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + // poll on search + await supertest + .post(`/internal/search/ese/${id1}`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + }) + .expect(200); + + const getSessionSecondTime = await supertest + .get(`/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + expect(getSessionFirstTime.body.attributes.sessionId).to.be.equal( + getSessionSecondTime.body.attributes.sessionId + ); + expect(getSessionFirstTime.body.attributes.touched).to.be.lessThan( + getSessionSecondTime.body.attributes.touched + ); + }); }); } diff --git a/x-pack/test/api_integration/config.ts b/x-pack/test/api_integration/config.ts index 546b23ab4f26c..8563d60ca68fc 100644 --- a/x-pack/test/api_integration/config.ts +++ b/x-pack/test/api_integration/config.ts @@ -31,6 +31,8 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi '--xpack.fleet.enabled=true', '--xpack.fleet.agents.pollingRequestTimeout=5000', // 5 seconds '--xpack.data_enhanced.search.sessions.enabled=true', // enable WIP send to background UI + '--xpack.data_enhanced.search.sessions.notTouchedTimeout=15s', // shorten notTouchedTimeout for quicker testing + '--xpack.data_enhanced.search.sessions.trackingInterval=5s', // shorten trackingInterval for quicker testing ], }, esTestCluster: { diff --git a/x-pack/test/functional/es_archives/data/search_sessions/data.json.gz b/x-pack/test/functional/es_archives/data/search_sessions/data.json.gz index 28260ee99e4dc0c428d5e2a75a0a14db990e2723..51e8c09f19247f979a9f1823ab5786b99cf849e4 100644 GIT binary patch literal 1976 zcmV;p2S@lHiwFqBRT*Fa17u-zVJ>QOZ*BnXn_E-cI26a<`zgHMx1Gi#`Ib0O%eKI> zvjs{*J8kJ?6k7%_j-A>vTZZX(UrElzNd*Z_vDeIjv|w46qw|-3(s1V7X*#=|QoXO~ zLF(%-HT{Ofg?PTx^e7PJ`IPvyAJC8#F%vbSB%oi#8{;|}lZ1@0D9zI%^Jfhw(Pc(h zlqT!eCG&rDy6BX+RhaMudrC6Pc)k%?`GQrq##YzG)k{+x^HJi|-dN?lwEvPm?gZ z?Dq`By|R^m%NFJD<06WK<6^>}JcPQ~^GwI?+Q{>;bd8+L(Y#B?x3TG3sAEyr>R?RA z9YSrlL$T!&8U~0rEhnzla51vpHM|EzoX>eye}mun$HIo=T95LZ^iyX_7^7Lz-)d%C zd3UAXvM7vb9PBZci5cp5-$WBxCZ9ij7SAl2&;wC+JX=Sk8mCDu+%M2Gqzl{bKng5($@y1_rywD z$d6YKw?&Ny7qi_ry9Rc9J7R-nB*}>{N=V#yU5_FX{Df7$VWc&-On4ucbV3uRCX8d4 z2$|OMO9dGG5MfHv+Q{%s`$q_~YW-%W%aNR?5C_*SPC#!)EKTEFtqn39$Ho=-cU9uJ z?bfE;l%1UBV$b%!d_j0SSFbi)_iU8=Dd&J_#N8=ut{RN++QG=H(XZ2}yeBWKncr8N z3b^EKaaq@1@`=3R14qg1s@mhXQ3v~`*O^^26u*X*!vV7mRL(6h; z=(wTlQ|g5Q4!Z;iTI&b#Ux)7w-{KE(K+eyncs#lK!#~6y-d$b~YQ7&H z3{D1kNDmns;N$)G*VF0wFh$3&&;Kp;j*^H)BpxtvOWSa?l*C>x3@O5Cc}N{S>U@k6 z-sVOwc%2k16=m|+C{36j8S)5~#g+H^4E5qd-@%q0Q2qPkWkS!mp)zHVWQQx-u$TfPFEf)u$ z=-TzG``z`c$myRX7sq@^;+)E|`E}wb>6~WiC0~ZQHk*JZ7d|gxX+}ZC|Fequ4;jlP zE%NeCIGWDHm#EF+Lz+;2qfN82%hyMPS10fG&qjNvr=#<;zek7r$NOh{!;`a(XkUZn z<|~kG8Q8OTB)M4w$rXrJk=)u$a$zfw92oFZNd79FOk>Iy7jV3F8t%w(ewwc>5IG5#c3#f@ebTE&q146TuZ|RuS9*g2Vo=^BPY0 zK8#~vcndY$^%_2x{+UqRse$4OJgX?~0>yy73XEX{!6t|5;Ap|w65bxChaRr@K z5cdG$0DuOAnt<@OCAG4r^%%zEYMbCs&Ig`W%)o%b#ZvqkSMA;6;y}E7Kx&8 zc);#|(wM3R3~z~w5K~1%$UnL5QhV2wp>?s0z(0fYG4Y68RC zq9Ft|s}whDNEB7!Sw(S36a@k_7t{oTw?#h)YA)zqL#Cz*%PM+9rY7wErh}ReFuWxy zLQr#!=A1DXwL0>|5;Ap|w65U(LqQ$=SL#354?0MKMm6A<1O1tF+e zA$Sd$nkqD_2o9N=u>YG4Y68RCq9Ft|S14XXrltzdDvCp;xpe7K!E&4snpyq#x KdKBy*oB#mBztLO( literal 1956 zcmZ9Jc|6m79KeSh9TP&B^*E=Nk)y(zq+{8LFt-&g=T?br)XoBaqK zZlb5cW0S}AEXNoT5lf!+Jg?`U=dbr4-`D5+{(Rrx-{^93mpp!)0Z9@3BK(X(!Y^I` z&E|N!3|c71j)?ZMhHD8@t*9J_!wEVeYp7D*Bil)Yt)84uT*GW^y?+%tW0T~j_N zzst5vQ8k>o9AWg>JlB#6&98gttUIN4dY6VVvJGq(`ZJnql9pJL>7vx5_0)vfo3wJ0 zm3VFZV)qQe-qf_Ifzv&slMBw(%amrO9g0HJ)7DeU{dKH=H$&}Ee|VATdgPqf2g3sW zs0HTY>NpDmF@Xo!(IIcgOOmo#HGYpKq#u8Fr?oBcCa~!0ZFGWH=l$d{oHKOA$)?of#1DMyBX+L6o41yzk9C_(?owkc!ja#8) zSFKe`6{9S(9R&&Uz##~kG(F&(EU3kn@X31slr3MbD(^Bs1Y+cNAhKv9R&g@w(YKK& z2bZg=1d+@6OR%NBhiRoxkE;?n?DFb*)91vb9oZ+MaJYG>26g1S!bjMn!!Z-@ai)2) z4YOA!(61N4T?cV+FSwW6`IVYm$_=k9dKZs2nsPr*SH7DDh`r=8G&9G(I9O@RZ8X`= zNU&SSE)wE1zC*)sC+}4Iial`Us$Rxt7lMV$83j(koLub`S*?4`GFSqf-x=t>NSmv?hGnI{*?t4V;vxS0q#YCy8rrK1v z&w*%Wa9G!_FBz}#YCC13($j>jfLA(R731|6%TTAB0c8K0z${7$X`OGTTb9o%Jj4^Y6p*AwLY%5 zwi*A*uW&zL&(q9|uw`Q$)VS3OEa7~$(GT+Z+{q0LALi+-uLpJgfdR%eQ?KA7Z+!U9 zJe11VMOK5UL&9|%2DQld7Y~G;IH@=>dDFu9csT#13E%3lj9%mTr@`SZ0h93&l9tNo z7O*aaYS)rrh5sC}UO7p9Y4LR}k4`dp+O`tdNqb~+Brt?dkJdp+hS%3#d}r=Br=ycfv^N6<3a8F1s>9t#`9 z)~~l{=V9g8s}hSB-@MLWXTKcl z95}&vo*K3o(_ZBuL?ZnMdtyn_cX2#GVo+h;F@Dy4m5}W`kp-IVAGm}Lx@FdTc!&R> zMq;^5RT6LJdWod}(drFz|1Vo_N#SeW$05IZrNJU$wZmT>oseT*jeaT>zHjz#5~cfA zZr)mFE`7G(l|#M%t~e(JXpeq750&^ts?sJvb6Vy?wE)RhxWCAT zm`L&UM30ia4dp!BWh!F8wQiR@ZiFU-xC+8G76T*%cnU0r1A&j|dPY40B(=f`d#8z@ z0fo84vI9}ji6=GL8`wo5t66`l42Bx34x)=}dcwn;gLZRVp%SI(k{E<`9|wjafhgEA zIlK4!iJ{_?1&?^rZVoM$`(SBPP)%z>$`Vn@`zW(;i+HD_O#Nf^K!L%09YU2>5Gcjf zz#wp%ZewC-^Y+6~5P?seM(KfE=>K=I>hqLGQQ*=zXrrTehu_- z0-(@;)Sm7CSCju$lmD+S)BQ(XcJ4p*=%+YmtQ_TzCmKZYCW<@3SO{=o23+-7iWmxW z*(^rv48YsYVr+E{Eg`#_c*KX4kZDfbY;uIee}7S)*MGY8oC=WFuPBK>*RS|wNEKq+ zUu8!Ga~N>tXNh7c%zk1q-pl|V;^JVRUpIw$#EU^Y%Y8uJboN&#vd7gFt@+=FP|z diff --git a/x-pack/test/functional/es_archives/data/search_sessions/mappings.json b/x-pack/test/functional/es_archives/data/search_sessions/mappings.json index 24bbcbea23385..4492bcae7047d 100644 --- a/x-pack/test/functional/es_archives/data/search_sessions/mappings.json +++ b/x-pack/test/functional/es_archives/data/search_sessions/mappings.json @@ -310,6 +310,9 @@ "created": { "type": "date" }, + "touched": { + "type": "date" + }, "expires": { "type": "date" }, @@ -324,6 +327,9 @@ "name": { "type": "keyword" }, + "persisted": { + "type": "boolean" + }, "restoreState": { "enabled": false, "type": "object" diff --git a/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background.ts b/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background.ts index 3e417551c3cb9..35ee15472f346 100644 --- a/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background.ts +++ b/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background.ts @@ -80,6 +80,7 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { ); // load URL to restore a saved session + // TODO: replace with clicking on "Re-run link" const url = await browser.getCurrentUrl(); const savedSessionURL = `${url}&searchSessionId=${savedSessionId}`; await browser.get(savedSessionURL); diff --git a/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background_relative_time.ts b/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background_relative_time.ts index 25291fd74b322..5d5cdb29523bd 100644 --- a/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background_relative_time.ts +++ b/x-pack/test/send_search_to_background_integration/tests/apps/dashboard/async_search/send_to_background_relative_time.ts @@ -19,13 +19,13 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { 'home', 'timePicker', 'maps', + 'searchSessionsManagement', ]); const dashboardPanelActions = getService('dashboardPanelActions'); const inspector = getService('inspector'); const pieChart = getService('pieChart'); const find = getService('find'); const dashboardExpect = getService('dashboardExpect'); - const browser = getService('browser'); const searchSessions = getService('searchSessions'); describe('send to background with relative time', () => { @@ -59,23 +59,20 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { await PageObjects.timePicker.pauseAutoRefresh(); // sample data has auto-refresh on await PageObjects.header.waitUntilLoadingHasFinished(); await PageObjects.dashboard.waitForRenderComplete(); - await checkSampleDashboardLoaded(); await searchSessions.expectState('completed'); await searchSessions.save(); await searchSessions.expectState('backgroundCompleted'); - const savedSessionId = await dashboardPanelActions.getSearchSessionIdByTitle( - '[Flights] Airline Carrier' - ); - const resolvedTimeRange = await getResolvedTimeRangeFromPanel('[Flights] Airline Carrier'); + + await checkSampleDashboardLoaded(); // load URL to restore a saved session - const url = await browser.getCurrentUrl(); - const savedSessionURL = `${url}&searchSessionId=${savedSessionId}` - .replace('now-24h', `'${resolvedTimeRange.gte}'`) - .replace('now', `'${resolvedTimeRange.lte}'`); - log.debug('Trying to restore session by URL:', savedSessionId); - await browser.get(savedSessionURL); + await PageObjects.searchSessionsManagement.goTo(); + const searchSessionList = await PageObjects.searchSessionsManagement.getList(); + + // navigate to dashboard + await searchSessionList[0].view(); + await PageObjects.header.waitUntilLoadingHasFinished(); await PageObjects.dashboard.waitForRenderComplete(); await checkSampleDashboardLoaded(); @@ -87,16 +84,6 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { // HELPERS - async function getResolvedTimeRangeFromPanel( - panelTitle: string - ): Promise<{ gte: string; lte: string }> { - await dashboardPanelActions.openInspectorByTitle(panelTitle); - await inspector.openInspectorRequestsView(); - await (await inspector.getOpenRequestDetailRequestButton()).click(); - const request = JSON.parse(await inspector.getCodeEditorValue()); - return request.query.bool.filter.find((f: any) => f.range).range.timestamp; - } - async function checkSampleDashboardLoaded() { log.debug('Checking no error labels'); await testSubjects.missingOrFail('embeddableErrorLabel');