From 9c042657a908f1e4c28d5a99b7bc5469d8917396 Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Tue, 22 Dec 2020 11:19:59 -0700 Subject: [PATCH 1/2] [data.search.session] Change search session client to implement plugin --- src/plugins/data/server/index.ts | 1 + .../data/server/search/search_service.ts | 23 ++- .../server/search/session/session_service.ts | 66 ++++++-- .../data/server/search/session/types.ts | 18 +-- x-pack/plugins/data_enhanced/server/plugin.ts | 15 +- .../server/search/session/session_service.ts | 151 +++++++++++------- 6 files changed, 179 insertions(+), 95 deletions(-) diff --git a/src/plugins/data/server/index.ts b/src/plugins/data/server/index.ts index b42619500525b..02ea7b061196c 100644 --- a/src/plugins/data/server/index.ts +++ b/src/plugins/data/server/index.ts @@ -246,6 +246,7 @@ export { SearchUsage, SessionService, ISessionService, + ISessionServiceDependencies, } from './search'; // Search namespace diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 6c8683220bc4c..5c352b999c369 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -72,11 +72,11 @@ import { } from '../../common/search/aggs/buckets/shard_delay'; import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn'; import { ConfigSchema } from '../../config'; -import { SessionService, IScopedSessionService, ISessionService } from './session'; +import { SessionService, ISessionService } from './session'; declare module 'src/core/server' { interface RequestHandlerContext { - search?: ISearchClient & { session: IScopedSessionService }; + search?: ISearchClient & { session: ISearchClient }; } } @@ -130,6 +130,8 @@ export class SearchService implements Plugin { registerSearchRoute(router); registerMsearchRoute(router, routeDependencies); + this.sessionService.setup(core, {}); + core.getStartServices().then(([coreStart]) => { this.coreStart = coreStart; }); @@ -292,7 +294,6 @@ export class SearchService implements Plugin { SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest, SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse >( - session: IScopedSessionService, request: SearchStrategyRequest, options: ISearchOptions, deps: SearchStrategyDependencies @@ -300,7 +301,7 @@ export class SearchService implements Plugin { const strategy = this.getSearchStrategy( options.strategy ); - return session.search(strategy, request, options, deps); + return strategy.search(request, options, deps); }; private cancel = (id: string, options: ISearchOptions, deps: SearchStrategyDependencies) => { @@ -326,7 +327,6 @@ export class SearchService implements Plugin { const { elasticsearch, savedObjects, uiSettings } = core; const getSessionAsScoped = this.sessionService.asScopedProvider(core); return (request: KibanaRequest): ISearchClient => { - const scopedSession = getSessionAsScoped(request); const savedObjectsClient = savedObjects.getScopedClient(request); const deps = { savedObjectsClient, @@ -334,9 +334,16 @@ export class SearchService implements Plugin { uiSettingsClient: uiSettings.asScopedToClient(savedObjectsClient), }; return { - search: (searchRequest, options = {}) => - this.search(scopedSession, searchRequest, options, deps), - cancel: (id, options = {}) => this.cancel(id, options, deps), + search: (searchRequest, options = {}) => { + return options.sessionId + ? getSessionAsScoped(request).search(searchRequest, options) + : this.search(searchRequest, options, deps); + }, + cancel: (id, options = {}) => { + return options.sessionId + ? getSessionAsScoped(request).cancel(id, options) + : this.cancel(id, options, deps); + }, }; }; }; diff --git a/src/plugins/data/server/search/session/session_service.ts b/src/plugins/data/server/search/session/session_service.ts index 15021436d8821..66c008d2d7948 100644 --- a/src/plugins/data/server/search/session/session_service.ts +++ b/src/plugins/data/server/search/session/session_service.ts @@ -17,27 +17,69 @@ * under the License. */ -import { CoreStart, KibanaRequest } from 'kibana/server'; -import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../../common'; -import { ISearchStrategy } from '../types'; -import { ISessionService } from './types'; +import { CoreSetup, KibanaRequest } from 'kibana/server'; +import { ISessionService, ISessionServiceDependencies } from './types'; +import { DataPluginStart } from '../../plugin'; +import { + IKibanaSearchRequest, + IKibanaSearchResponse, + ISearchClient, + ISearchOptions, +} from '../../../common'; /** * The OSS session service. See data_enhanced in X-Pack for the background session service. */ export class SessionService implements ISessionService { + protected searchAsScoped!: (request: KibanaRequest) => ISearchClient; + constructor() {} - public search( - strategy: ISearchStrategy, - ...args: Parameters['search']> + public setup(core: CoreSetup<{}, DataPluginStart>) { + core.getStartServices().then(([, , dataStart]) => { + this.searchAsScoped = dataStart.search.asScoped; + }); + } + + public start() {} + + public stop() {} + + /** + * Forward this search request to the search service, removing the sessionId (so the search client + * doesn't forward back to this service). + */ + private search( + { searchClient }: ISessionServiceDependencies, + request: Request, + { sessionId, ...options }: ISearchOptions ) { - return strategy.search(...args); + return searchClient.search(request, options); } - public asScopedProvider(core: CoreStart) { - return (request: KibanaRequest) => ({ - search: this.search, - }); + /** + * Forward this search request to the search service, removing the sessionId (so the search client + * doesn't forward back to this service). + */ + private cancel( + { searchClient }: ISessionServiceDependencies, + id: string, + { sessionId, ...options }: ISearchOptions = {} + ) { + return searchClient.cancel(id, options); + } + + public asScopedProvider() { + return (request: KibanaRequest) => { + const searchClient = this.searchAsScoped(request); + const deps = { searchClient }; + return { + search: ( + searchRequest: Request, + options: ISearchOptions = {} + ) => this.search(deps, searchRequest, options), + cancel: this.cancel.bind(this, deps), + }; + }; } } diff --git a/src/plugins/data/server/search/session/types.ts b/src/plugins/data/server/search/session/types.ts index 5e179b99952fe..732502be2cbae 100644 --- a/src/plugins/data/server/search/session/types.ts +++ b/src/plugins/data/server/search/session/types.ts @@ -17,19 +17,13 @@ * under the License. */ -import { Observable } from 'rxjs'; -import { CoreStart, KibanaRequest } from 'kibana/server'; -import { ISearchStrategy } from '../types'; -import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../../common/search'; +import { CoreStart, KibanaRequest, Plugin } from 'kibana/server'; +import { ISearchClient } from '../../../common'; -export interface IScopedSessionService { - search: ( - strategy: ISearchStrategy, - ...args: Parameters['search']> - ) => Observable; - [prop: string]: any; +export interface ISessionService extends Plugin { + asScopedProvider: (core: CoreStart) => (request: KibanaRequest) => T; } -export interface ISessionService { - asScopedProvider: (core: CoreStart) => (request: KibanaRequest) => IScopedSessionService; +export interface ISessionServiceDependencies { + searchClient: ISearchClient; } diff --git a/x-pack/plugins/data_enhanced/server/plugin.ts b/x-pack/plugins/data_enhanced/server/plugin.ts index d0757ca5111b6..28106ec817479 100644 --- a/x-pack/plugins/data_enhanced/server/plugin.ts +++ b/x-pack/plugins/data_enhanced/server/plugin.ts @@ -15,17 +15,29 @@ import { ENHANCED_ES_SEARCH_STRATEGY, EQL_SEARCH_STRATEGY } from '../common'; import { registerSessionRoutes } from './routes'; import { backgroundSessionMapping } from './saved_objects'; import { + BackgroundSessionClient, BackgroundSessionService, enhancedEsSearchStrategyProvider, eqlSearchStrategyProvider, } from './search'; import { getUiSettings } from './ui_settings'; +import { ISearchClient } from '../../../../src/plugins/data/common'; interface SetupDependencies { data: DataPluginSetup; usageCollection?: UsageCollectionSetup; } +export interface DataEnhancedPluginsStart { + data: DataPluginStart; +} + +declare module 'src/core/server' { + interface RequestHandlerContext { + search?: ISearchClient & { session: BackgroundSessionClient }; + } +} + export class EnhancedDataServerPlugin implements Plugin { private readonly logger: Logger; private sessionService!: BackgroundSessionService; @@ -34,7 +46,7 @@ export class EnhancedDataServerPlugin implements Plugin, deps: SetupDependencies) { + public setup(core: CoreSetup, deps: SetupDependencies) { const usage = deps.usageCollection ? usageProvider(core) : undefined; core.uiSettings.register(getUiSettings()); @@ -55,6 +67,7 @@ export class EnhancedDataServerPlugin implements Plugin; } -export class BackgroundSessionService implements ISessionService { +export type BackgroundSessionClient = ReturnType< + ReturnType +>; + +export class BackgroundSessionService implements ISessionService { /** * Map of sessionId to { [requestHash]: searchId } * @private @@ -59,11 +65,18 @@ export class BackgroundSessionService implements ISessionService { private sessionSearchMap = new Map(); private internalSavedObjectsClient!: SavedObjectsClientContract; private monitorTimer!: NodeJS.Timeout; + protected searchAsScoped!: (request: KibanaRequest) => ISearchClient; constructor(private readonly logger: Logger) {} - public async start(core: CoreStart, config$: Observable) { - return this.setupMonitoring(core, config$); + public setup(core: CoreSetup) { + core.getStartServices().then(([, { data }]) => { + this.searchAsScoped = data.search.asScoped; + }); + } + + public start(core: CoreStart, config$: Observable) { + this.setupMonitoring(core, config$); } public stop() { @@ -71,7 +84,7 @@ export class BackgroundSessionService implements ISessionService { clearTimeout(this.monitorTimer); } - private setupMonitoring = async (core: CoreStart, config$: Observable) => { + private async setupMonitoring(core: CoreStart, config$: Observable) { const config = await config$.pipe(first()).toPromise(); if (config.search.sendToBackground.enabled) { this.logger.debug(`setupMonitoring | Enabling monitoring`); @@ -79,7 +92,7 @@ export class BackgroundSessionService implements ISessionService { this.internalSavedObjectsClient = new SavedObjectsClient(internalRepo); this.monitorMappedIds(); } - }; + } /** * Gets all {@link SessionSavedObjectAttributes | Background Searches} that @@ -104,7 +117,7 @@ export class BackgroundSessionService implements ISessionService { return res.saved_objects; } - private clearSessions = () => { + private clearSessions() { const curTime = moment(); this.sessionSearchMap.forEach((sessionInfo, sessionId) => { @@ -119,7 +132,7 @@ export class BackgroundSessionService implements ISessionService { this.sessionSearchMap.delete(sessionId); } }); - }; + } private async monitorMappedIds() { this.monitorTimer = setTimeout(async () => { @@ -183,32 +196,47 @@ export class BackgroundSessionService implements ISessionService { } public search( - strategy: ISearchStrategy, + deps: BackgroundSessionDependencies, searchRequest: Request, - options: ISearchOptions, - searchDeps: SearchStrategyDependencies, - deps: BackgroundSessionDependencies + options: ISearchOptions = {} ): Observable { + const { sessionId, ...otherOptions } = options; + // If this is a restored background search session, look up the ID using the provided sessionId const getSearchRequest = async () => !options.isRestore || searchRequest.id ? searchRequest : { ...searchRequest, - id: await this.getId(searchRequest, options, deps), + id: await this.getId(deps, searchRequest, options), }; return from(getSearchRequest()).pipe( - switchMap((request) => strategy.search(request, options, searchDeps)), + switchMap((request) => deps.searchClient.search(request, otherOptions)), tapFirst((response) => { - if (searchRequest.id || !options.sessionId || !response.id || options.isRestore) return; - this.trackId(searchRequest, response.id, options, deps); + if (searchRequest.id || !sessionId || !response.id || options.isRestore) return; + this.trackId(deps, searchRequest, response.id, options); }) ); } + public cancel( + deps: BackgroundSessionDependencies, + searchId: string, + { sessionId, ...options }: ISearchOptions = {} + ) { + if (sessionId && this.sessionSearchMap.has(sessionId)) { + const map = this.sessionSearchMap.get(sessionId)!; + map.ids.forEach((value, key) => { + if (value === searchId) map.ids.delete(key); + }); + } + return deps.searchClient.cancel(searchId, options); + } + // TODO: Generate the `userId` from the realm type/realm name/username - public save = async ( + public async save( + { savedObjectsClient }: BackgroundSessionDependencies, sessionId: string, { name, @@ -219,9 +247,8 @@ export class BackgroundSessionService implements ISessionService { urlGeneratorId, initialState = {}, restoreState = {}, - }: Partial, - { savedObjectsClient }: BackgroundSessionDependencies - ) => { + }: Partial + ) { if (!name) throw new Error('Name is required'); if (!appId) throw new Error('AppId is required'); if (!urlGeneratorId) throw new Error('UrlGeneratorId is required'); @@ -247,58 +274,58 @@ export class BackgroundSessionService implements ISessionService { ); return session; - }; + } // TODO: Throw an error if this session doesn't belong to this user - public get = (sessionId: string, { savedObjectsClient }: BackgroundSessionDependencies) => { + public get({ savedObjectsClient }: BackgroundSessionDependencies, sessionId: string) { this.logger.debug(`get | ${sessionId}`); return savedObjectsClient.get( BACKGROUND_SESSION_TYPE, sessionId ); - }; + } // TODO: Throw an error if this session doesn't belong to this user - public find = ( - options: BackgroundSessionFindOptions, - { savedObjectsClient }: BackgroundSessionDependencies - ) => { + public find( + { savedObjectsClient }: BackgroundSessionDependencies, + options: BackgroundSessionFindOptions + ) { return savedObjectsClient.find({ ...options, type: BACKGROUND_SESSION_TYPE, }); - }; + } // TODO: Throw an error if this session doesn't belong to this user - public update = ( + public update( + { savedObjectsClient }: BackgroundSessionDependencies, sessionId: string, - attributes: Partial, - { savedObjectsClient }: BackgroundSessionDependencies - ) => { + attributes: Partial + ) { this.logger.debug(`update | ${sessionId}`); return savedObjectsClient.update( BACKGROUND_SESSION_TYPE, sessionId, attributes ); - }; + } // TODO: Throw an error if this session doesn't belong to this user - public delete = (sessionId: string, { savedObjectsClient }: BackgroundSessionDependencies) => { + public delete({ savedObjectsClient }: BackgroundSessionDependencies, sessionId: string) { return savedObjectsClient.delete(BACKGROUND_SESSION_TYPE, sessionId); - }; + } /** * Tracks the given search request/search ID in the saved session (if it exists). Otherwise, just * store it in memory until a saved session exists. * @internal */ - public trackId = async ( + public async trackId( + deps: BackgroundSessionDependencies, searchRequest: IKibanaSearchRequest, searchId: string, - { sessionId, isStored }: ISearchOptions, - deps: BackgroundSessionDependencies - ) => { + { sessionId, isStored }: ISearchOptions + ) { if (!sessionId || !searchId) return; this.logger.debug(`trackId | ${sessionId} | ${searchId}`); const requestHash = createRequestHash(searchRequest.params); @@ -307,7 +334,7 @@ export class BackgroundSessionService implements ISessionService { // Otherwise, just update the in-memory mapping for this session for when the session is saved. if (isStored) { const attributes = { idMapping: { [requestHash]: searchId } }; - await this.update(sessionId, attributes, deps); + await this.update(deps, sessionId, attributes); } else { const map = this.sessionSearchMap.get(sessionId) ?? { insertTime: moment(), @@ -317,7 +344,7 @@ export class BackgroundSessionService implements ISessionService { map.ids.set(requestHash, searchId); this.sessionSearchMap.set(sessionId, map); } - }; + } /** * Look up an existing search ID that matches the given request in the given session so that the @@ -325,9 +352,9 @@ export class BackgroundSessionService implements ISessionService { * @internal */ public getId = async ( + deps: BackgroundSessionDependencies, searchRequest: IKibanaSearchRequest, - { sessionId, isStored, isRestore }: ISearchOptions, - deps: BackgroundSessionDependencies + { sessionId, isStored, isRestore }: ISearchOptions ) => { if (!sessionId) { throw new Error('Session ID is required'); @@ -337,7 +364,7 @@ export class BackgroundSessionService implements ISessionService { throw new Error('Get search ID is only supported when restoring a session'); } - const session = await this.get(sessionId, deps); + const session = await this.get(deps, sessionId); const requestHash = createRequestHash(searchRequest.params); if (!session.attributes.idMapping.hasOwnProperty(requestHash)) { throw new Error('No search ID in this session matching the given search request'); @@ -346,25 +373,25 @@ export class BackgroundSessionService implements ISessionService { return session.attributes.idMapping[requestHash]; }; - public asScopedProvider = ({ savedObjects }: CoreStart) => { + public asScopedProvider({ savedObjects }: CoreStart) { return (request: KibanaRequest) => { const savedObjectsClient = savedObjects.getScopedClient(request, { includedHiddenTypes: [BACKGROUND_SESSION_TYPE], }); - const deps = { savedObjectsClient }; + const searchClient = this.searchAsScoped(request); + const deps = { savedObjectsClient, searchClient }; return { search: ( - strategy: ISearchStrategy, - ...args: Parameters['search']> - ) => this.search(strategy, ...args, deps), - save: (sessionId: string, attributes: Partial) => - this.save(sessionId, attributes, deps), - get: (sessionId: string) => this.get(sessionId, deps), - find: (options: BackgroundSessionFindOptions) => this.find(options, deps), - update: (sessionId: string, attributes: Partial) => - this.update(sessionId, attributes, deps), - delete: (sessionId: string) => this.delete(sessionId, deps), + searchRequest: Request, + options: ISearchOptions = {} + ) => this.search(deps, searchRequest, options), + cancel: this.cancel.bind(this, deps), + save: this.save.bind(this, deps), + get: this.get.bind(this, deps), + find: this.find.bind(this, deps), + update: this.update.bind(this, deps), + delete: this.delete.bind(this, deps), }; }; - }; + } } From 20bb7bb837d54f837084fc12e466fd95a557f05c Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Tue, 5 Jan 2021 11:20:17 -0700 Subject: [PATCH 2/2] [data.search.session] Add extend method and expose route --- .../public/search/session/sessions_client.ts | 6 +++ .../data_enhanced/server/routes/session.ts | 35 ++++++++++++++ .../server/search/session/session_service.ts | 47 +++++++++++++++++-- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/src/plugins/data/public/search/session/sessions_client.ts b/src/plugins/data/public/search/session/sessions_client.ts index 38be647a37c7a..477b398be2011 100644 --- a/src/plugins/data/public/search/session/sessions_client.ts +++ b/src/plugins/data/public/search/session/sessions_client.ts @@ -82,4 +82,10 @@ export class SessionsClient { public delete(sessionId: string): Promise { return this.http!.delete(`/internal/session/${encodeURIComponent(sessionId)}`); } + + public extend(sessionId: string, keepAlive: string): Promise { + return this.http!.post(`/internal/session/${encodeURIComponent(sessionId)}/_extend`, { + body: JSON.stringify({ keepAlive }), + }); + } } diff --git a/x-pack/plugins/data_enhanced/server/routes/session.ts b/x-pack/plugins/data_enhanced/server/routes/session.ts index b056513f1d2f5..d1da982dd3777 100644 --- a/x-pack/plugins/data_enhanced/server/routes/session.ts +++ b/x-pack/plugins/data_enhanced/server/routes/session.ts @@ -160,4 +160,39 @@ export function registerSessionRoutes(router: IRouter): void { } } ); + + router.post( + { + path: '/internal/session/{id}/_extend', + validate: { + params: schema.object({ + id: schema.string(), + }), + body: schema.object({ + keepAlive: schema.string(), + }), + }, + }, + async (context, request, res) => { + const { id } = request.params; + const { keepAlive } = request.body; + try { + const response = await context.search!.session.extend(id, keepAlive); + + return res.ok({ + body: response, + }); + } catch (err) { + return res.customError({ + statusCode: err.statusCode || 500, + body: { + message: err.message, + attributes: { + error: err.body?.error || err.message, + }, + }, + }); + } + } + ); } diff --git a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts index 7986a52862ef8..9de6f89400045 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts @@ -7,6 +7,7 @@ import moment, { Moment } from 'moment'; import { from, Observable } from 'rxjs'; import { first, switchMap } from 'rxjs/operators'; +import dateMath from '@elastic/datemath'; import { CoreSetup, CoreStart, @@ -331,9 +332,48 @@ export class BackgroundSessionService implements ISessionService { + return this.cancel(deps, id, { strategy }); + }); + await Promise.all(promises); + + return deps.savedObjectsClient.delete(BACKGROUND_SESSION_TYPE, sessionId); + } + + /** + * Extend the TTL of a session by updating the saved object and extending the TTL of each search + * request in the ID mapping. + */ + public async extend(deps: BackgroundSessionDependencies, sessionId: string, keepAlive: string) { + if (!sessionId) { + throw new Error('Session ID is required'); + } + + // Calculate the new `expires` value given the `keepAlive` + const expiresMoment = dateMath.parse(`now+${keepAlive}`); + if (!expiresMoment || isNaN(expiresMoment.date())) { + throw new Error(`"${keepAlive}" is not a valid value for keepAlive`); + } + + // Extend the TTL of each search request in the ID mapping + const expires = expiresMoment.toISOString(); + const session = await this.get(deps, sessionId); + const searchRequests = Object.values(session.attributes.idMapping); + const promises = searchRequests.map(({ id, strategy }) => { + return deps.searchClient.extend(id, keepAlive, { strategy }); + }); + await Promise.all(promises); + + // Update the `expires` value in the search session saved object + return this.update(deps, sessionId, { expires }); } /** @@ -361,7 +401,7 @@ export class BackgroundSessionService implements ISessionService