From 4e8c8ebaffbb4f4d68ac536a3226b1ab1b19b27b Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 20 Nov 2020 13:49:30 -0500 Subject: [PATCH] [event_log] index event docs in bulk instead of individually (redo) (#83927) resolves #55634 resolves #65746 Buffers event docs being written for a fixed interval / buffer size, and indexes those docs via a bulk ES call. Also now flushing those buffers at plugin stop() time, which we couldn't do before with the single index calls, which were run via `setImmediate()`. This is a redo of PR https://github.com/elastic/kibana/pull/80941 which had to be reverted. --- .../server/es/cluster_client_adapter.mock.ts | 2 + .../server/es/cluster_client_adapter.test.ts | 166 ++++++++++++++++-- .../server/es/cluster_client_adapter.ts | 72 +++++++- .../event_log/server/es/context.mock.ts | 1 + x-pack/plugins/event_log/server/es/context.ts | 6 + .../event_log/server/event_logger.test.ts | 3 +- .../plugins/event_log/server/event_logger.ts | 45 +---- .../server/lib/bounded_queue.test.ts | 161 ----------------- .../event_log/server/lib/bounded_queue.ts | 91 ---------- .../event_log/server/lib/ready_signal.ts | 2 +- .../plugins/event_log/server/plugin.test.ts | 49 ++++++ x-pack/plugins/event_log/server/plugin.ts | 40 +++-- .../plugins/event_log/server/init_routes.ts | 27 --- .../plugins/event_log/server/plugin.ts | 2 - .../event_log/service_api_integration.ts | 20 --- 15 files changed, 314 insertions(+), 373 deletions(-) delete mode 100644 x-pack/plugins/event_log/server/lib/bounded_queue.test.ts delete mode 100644 x-pack/plugins/event_log/server/lib/bounded_queue.ts create mode 100644 x-pack/plugins/event_log/server/plugin.test.ts diff --git a/x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts b/x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts index bd57958b0cb88..c1f60f2d63049 100644 --- a/x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts +++ b/x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts @@ -9,6 +9,7 @@ import { IClusterClientAdapter } from './cluster_client_adapter'; const createClusterClientMock = () => { const mock: jest.Mocked = { indexDocument: jest.fn(), + indexDocuments: jest.fn(), doesIlmPolicyExist: jest.fn(), createIlmPolicy: jest.fn(), doesIndexTemplateExist: jest.fn(), @@ -16,6 +17,7 @@ const createClusterClientMock = () => { doesAliasExist: jest.fn(), createIndex: jest.fn(), queryEventsBySavedObject: jest.fn(), + shutdown: jest.fn(), }; return mock; }; diff --git a/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts b/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts index 6e787c905d400..57a6b1d3bb932 100644 --- a/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts +++ b/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts @@ -4,14 +4,22 @@ * you may not use this file except in compliance with the Elastic License. */ -import { LegacyClusterClient, Logger } from 'src/core/server'; +import { LegacyClusterClient } from 'src/core/server'; import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks'; -import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter'; +import { + ClusterClientAdapter, + IClusterClientAdapter, + EVENT_BUFFER_LENGTH, +} from './cluster_client_adapter'; +import { contextMock } from './context.mock'; import { findOptionsSchema } from '../event_log_client'; +import { delay } from '../lib/delay'; +import { times } from 'lodash'; type EsClusterClient = Pick, 'callAsInternalUser' | 'asScoped'>; +type MockedLogger = ReturnType; -let logger: Logger; +let logger: MockedLogger; let clusterClient: EsClusterClient; let clusterClientAdapter: IClusterClientAdapter; @@ -21,22 +29,130 @@ beforeEach(() => { clusterClientAdapter = new ClusterClientAdapter({ logger, clusterClientPromise: Promise.resolve(clusterClient), + context: contextMock.create(), }); }); describe('indexDocument', () => { - test('should call cluster client with given doc', async () => { - await clusterClientAdapter.indexDocument({ args: true }); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', { - args: true, + test('should call cluster client bulk with given doc', async () => { + clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' }); + + await retryUntil('cluster client bulk called', () => { + return clusterClient.callAsInternalUser.mock.calls.length !== 0; + }); + + expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', { + body: [{ create: { _index: 'event-log' } }, { message: 'foo' }], }); }); - test('should throw error when cluster client throws an error', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); - await expect( - clusterClientAdapter.indexDocument({ args: true }) - ).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`); + test('should log an error when cluster client throws an error', async () => { + clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure')); + clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' }); + await retryUntil('cluster client bulk called', () => { + return logger.error.mock.calls.length !== 0; + }); + + const expectedMessage = `error writing bulk events: "expected failure"; docs: [{"create":{"_index":"event-log"}},{"message":"foo"}]`; + expect(logger.error).toHaveBeenCalledWith(expectedMessage); + }); +}); + +describe('shutdown()', () => { + test('should work if no docs have been written', async () => { + const result = await clusterClientAdapter.shutdown(); + expect(result).toBeFalsy(); + }); + + test('should work if some docs have been written', async () => { + clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' }); + const resultPromise = clusterClientAdapter.shutdown(); + + await retryUntil('cluster client bulk called', () => { + return clusterClient.callAsInternalUser.mock.calls.length !== 0; + }); + + const result = await resultPromise; + expect(result).toBeFalsy(); + }); +}); + +describe('buffering documents', () => { + test('should write buffered docs after timeout', async () => { + // write EVENT_BUFFER_LENGTH - 1 docs + for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) { + clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' }); + } + + await retryUntil('cluster client bulk called', () => { + return clusterClient.callAsInternalUser.mock.calls.length !== 0; + }); + + const expectedBody = []; + for (let i = 0; i < EVENT_BUFFER_LENGTH - 1; i++) { + expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` }); + } + + expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', { + body: expectedBody, + }); + }); + + test('should write buffered docs after buffer exceeded', async () => { + // write EVENT_BUFFER_LENGTH + 1 docs + for (let i = 0; i < EVENT_BUFFER_LENGTH + 1; i++) { + clusterClientAdapter.indexDocument({ body: { message: `foo ${i}` }, index: 'event-log' }); + } + + await retryUntil('cluster client bulk called', () => { + return clusterClient.callAsInternalUser.mock.calls.length >= 2; + }); + + const expectedBody = []; + for (let i = 0; i < EVENT_BUFFER_LENGTH; i++) { + expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` }); + } + + expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', { + body: expectedBody, + }); + + expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', { + body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }], + }); + }); + + test('should handle lots of docs correctly with a delay in the bulk index', async () => { + // @ts-ignore + clusterClient.callAsInternalUser.mockImplementation = async () => await delay(100); + + const docs = times(EVENT_BUFFER_LENGTH * 10, (i) => ({ + body: { message: `foo ${i}` }, + index: 'event-log', + })); + + // write EVENT_BUFFER_LENGTH * 10 docs + for (const doc of docs) { + clusterClientAdapter.indexDocument(doc); + } + + await retryUntil('cluster client bulk called', () => { + return clusterClient.callAsInternalUser.mock.calls.length >= 10; + }); + + for (let i = 0; i < 10; i++) { + const expectedBody = []; + for (let j = 0; j < EVENT_BUFFER_LENGTH; j++) { + expectedBody.push( + { create: { _index: 'event-log' } }, + { message: `foo ${i * EVENT_BUFFER_LENGTH + j}` } + ); + } + + expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(i + 1, 'bulk', { + body: expectedBody, + }); + } }); }); @@ -575,3 +691,29 @@ describe('queryEventsBySavedObject', () => { `); }); }); + +type RetryableFunction = () => boolean; + +const RETRY_UNTIL_DEFAULT_COUNT = 20; +const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds + +async function retryUntil( + label: string, + fn: RetryableFunction, + count: number = RETRY_UNTIL_DEFAULT_COUNT, + wait: number = RETRY_UNTIL_DEFAULT_WAIT +): Promise { + while (count > 0) { + count--; + + if (fn()) return true; + + // eslint-disable-next-line no-console + console.log(`attempt failed waiting for "${label}", attempts left: ${count}`); + + if (count === 0) return false; + await delay(wait); + } + + return false; +} diff --git a/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts b/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts index fa9f9c36052a1..d1dcf621150a6 100644 --- a/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts +++ b/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts @@ -4,20 +4,31 @@ * you may not use this file except in compliance with the Elastic License. */ +import { Subject } from 'rxjs'; +import { bufferTime, filter, switchMap } from 'rxjs/operators'; import { reject, isUndefined } from 'lodash'; import { SearchResponse, Client } from 'elasticsearch'; import type { PublicMethodsOf } from '@kbn/utility-types'; import { Logger, LegacyClusterClient } from 'src/core/server'; - -import { IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types'; +import { EsContext } from '.'; +import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types'; import { FindOptionsType } from '../event_log_client'; +export const EVENT_BUFFER_TIME = 1000; // milliseconds +export const EVENT_BUFFER_LENGTH = 100; + export type EsClusterClient = Pick; export type IClusterClientAdapter = PublicMethodsOf; +export interface Doc { + index: string; + body: IEvent; +} + export interface ConstructorOpts { logger: Logger; clusterClientPromise: Promise; + context: EsContext; } export interface QueryEventsBySavedObjectResult { @@ -30,14 +41,67 @@ export interface QueryEventsBySavedObjectResult { export class ClusterClientAdapter { private readonly logger: Logger; private readonly clusterClientPromise: Promise; + private readonly docBuffer$: Subject; + private readonly context: EsContext; + private readonly docsBufferedFlushed: Promise; constructor(opts: ConstructorOpts) { this.logger = opts.logger; this.clusterClientPromise = opts.clusterClientPromise; + this.context = opts.context; + this.docBuffer$ = new Subject(); + + // buffer event log docs for time / buffer length, ignore empty + // buffers, then index the buffered docs; kick things off with a + // promise on the observable, which we'll wait on in shutdown + this.docsBufferedFlushed = this.docBuffer$ + .pipe( + bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH), + filter((docs) => docs.length > 0), + switchMap(async (docs) => await this.indexDocuments(docs)) + ) + .toPromise(); } - public async indexDocument(doc: unknown): Promise { - await this.callEs>('index', doc); + // This will be called at plugin stop() time; the assumption is any plugins + // depending on the event_log will already be stopped, and so will not be + // writing more event docs. We complete the docBuffer$ observable, + // and wait for the docsBufffered$ observable to complete via it's promise, + // and so should end up writing all events out that pass through, before + // Kibana shuts down (cleanly). + public async shutdown(): Promise { + this.docBuffer$.complete(); + await this.docsBufferedFlushed; + } + + public indexDocument(doc: Doc): void { + this.docBuffer$.next(doc); + } + + async indexDocuments(docs: Doc[]): Promise { + // If es initialization failed, don't try to index. + // Also, don't log here, we log the failure case in plugin startup + // instead, otherwise we'd be spamming the log (if done here) + if (!(await this.context.waitTillReady())) { + return; + } + + const bulkBody: Array> = []; + + for (const doc of docs) { + if (doc.body === undefined) continue; + + bulkBody.push({ create: { _index: doc.index } }); + bulkBody.push(doc.body); + } + + try { + await this.callEs>('bulk', { body: bulkBody }); + } catch (err) { + this.logger.error( + `error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}` + ); + } } public async doesIlmPolicyExist(policyName: string): Promise { diff --git a/x-pack/plugins/event_log/server/es/context.mock.ts b/x-pack/plugins/event_log/server/es/context.mock.ts index aac7c684218aa..49a57fcb2b00d 100644 --- a/x-pack/plugins/event_log/server/es/context.mock.ts +++ b/x-pack/plugins/event_log/server/es/context.mock.ts @@ -18,6 +18,7 @@ const createContextMock = () => { logger: loggingSystemMock.createLogger(), esNames: namesMock.create(), initialize: jest.fn(), + shutdown: jest.fn(), waitTillReady: jest.fn(async () => true), esAdapter: clusterClientAdapterMock.create(), initialized: true, diff --git a/x-pack/plugins/event_log/server/es/context.ts b/x-pack/plugins/event_log/server/es/context.ts index 8c967e68299b5..d7f67620e7968 100644 --- a/x-pack/plugins/event_log/server/es/context.ts +++ b/x-pack/plugins/event_log/server/es/context.ts @@ -18,6 +18,7 @@ export interface EsContext { esNames: EsNames; esAdapter: IClusterClientAdapter; initialize(): void; + shutdown(): Promise; waitTillReady(): Promise; initialized: boolean; } @@ -52,6 +53,7 @@ class EsContextImpl implements EsContext { this.esAdapter = new ClusterClientAdapter({ logger: params.logger, clusterClientPromise: params.clusterClientPromise, + context: this, }); } @@ -74,6 +76,10 @@ class EsContextImpl implements EsContext { }); } + async shutdown() { + await this.esAdapter.shutdown(); + } + // waits till the ES initialization is done, returns true if it was successful, // false if it was not successful async waitTillReady(): Promise { diff --git a/x-pack/plugins/event_log/server/event_logger.test.ts b/x-pack/plugins/event_log/server/event_logger.test.ts index ea699af45ccd2..28b4f5325dcb7 100644 --- a/x-pack/plugins/event_log/server/event_logger.test.ts +++ b/x-pack/plugins/event_log/server/event_logger.test.ts @@ -59,7 +59,8 @@ describe('EventLogger', () => { eventLogger.logEvent({}); await waitForLogEvent(systemLogger); delay(WRITE_LOG_WAIT_MILLIS); // sleep a bit longer since event logging is async - expect(esContext.esAdapter.indexDocument).not.toHaveBeenCalled(); + expect(esContext.esAdapter.indexDocument).toHaveBeenCalled(); + expect(esContext.esAdapter.indexDocuments).not.toHaveBeenCalled(); }); test('method logEvent() writes expected default values', async () => { diff --git a/x-pack/plugins/event_log/server/event_logger.ts b/x-pack/plugins/event_log/server/event_logger.ts index 658d90d809652..db24379bb46ba 100644 --- a/x-pack/plugins/event_log/server/event_logger.ts +++ b/x-pack/plugins/event_log/server/event_logger.ts @@ -20,14 +20,10 @@ import { EventSchema, } from './types'; import { SAVED_OBJECT_REL_PRIMARY } from './types'; +import { Doc } from './es/cluster_client_adapter'; type SystemLogger = Plugin['systemLogger']; -interface Doc { - index: string; - body: IEvent; -} - interface IEventLoggerCtorParams { esContext: EsContext; eventLogService: EventLogService; @@ -159,44 +155,9 @@ function validateEvent(eventLogService: IEventLogService, event: IEvent): IValid export const EVENT_LOGGED_PREFIX = `event logged: `; function logEventDoc(logger: Logger, doc: Doc): void { - setImmediate(() => { - logger.info(`${EVENT_LOGGED_PREFIX}${JSON.stringify(doc.body)}`); - }); + logger.info(`event logged: ${JSON.stringify(doc.body)}`); } function indexEventDoc(esContext: EsContext, doc: Doc): void { - // TODO: - // the setImmediate() on an async function is a little overkill, but, - // setImmediate() may be tweakable via node params, whereas async - // tweaking is in the v8 params realm, which is very dicey. - // Long-term, we should probably create an in-memory queue for this, so - // we can explictly see/set the queue lengths. - - // already verified this.clusterClient isn't null above - setImmediate(async () => { - try { - await indexLogEventDoc(esContext, doc); - } catch (err) { - esContext.logger.warn(`error writing event doc: ${err.message}`); - writeLogEventDocOnError(esContext, doc); - } - }); -} - -// whew, the thing that actually writes the event log document! -async function indexLogEventDoc(esContext: EsContext, doc: unknown) { - esContext.logger.debug(`writing to event log: ${JSON.stringify(doc)}`); - const success = await esContext.waitTillReady(); - if (!success) { - esContext.logger.debug(`event log did not initialize correctly, event not written`); - return; - } - - await esContext.esAdapter.indexDocument(doc); - esContext.logger.debug(`writing to event log complete`); -} - -// TODO: write log entry to a bounded queue buffer -function writeLogEventDocOnError(esContext: EsContext, doc: unknown) { - esContext.logger.warn(`unable to write event doc: ${JSON.stringify(doc)}`); + esContext.esAdapter.indexDocument(doc); } diff --git a/x-pack/plugins/event_log/server/lib/bounded_queue.test.ts b/x-pack/plugins/event_log/server/lib/bounded_queue.test.ts deleted file mode 100644 index b30d83f24f261..0000000000000 --- a/x-pack/plugins/event_log/server/lib/bounded_queue.test.ts +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { createBoundedQueue } from './bounded_queue'; -import { loggingSystemMock } from 'src/core/server/mocks'; - -const loggingService = loggingSystemMock.create(); -const logger = loggingService.get(); - -describe('basic', () => { - let discardedHelper: DiscardedHelper; - let onDiscarded: (object: number) => void; - let queue2: ReturnType; - let queue10: ReturnType; - - beforeAll(() => { - discardedHelper = new DiscardedHelper(); - onDiscarded = discardedHelper.onDiscarded.bind(discardedHelper); - }); - - beforeEach(() => { - queue2 = createBoundedQueue({ logger, maxLength: 2, onDiscarded }); - queue10 = createBoundedQueue({ logger, maxLength: 10, onDiscarded }); - }); - - test('queued items: 0', () => { - discardedHelper.reset(); - expect(queue2.isEmpty()).toEqual(true); - expect(queue2.isFull()).toEqual(false); - expect(queue2.isCloseToFull()).toEqual(false); - expect(queue2.length).toEqual(0); - expect(queue2.maxLength).toEqual(2); - expect(queue2.pull(1)).toEqual([]); - expect(queue2.pull(100)).toEqual([]); - expect(discardedHelper.discarded).toEqual([]); - }); - - test('queued items: 1', () => { - discardedHelper.reset(); - queue2.push(1); - expect(queue2.isEmpty()).toEqual(false); - expect(queue2.isFull()).toEqual(false); - expect(queue2.isCloseToFull()).toEqual(false); - expect(queue2.length).toEqual(1); - expect(queue2.maxLength).toEqual(2); - expect(queue2.pull(1)).toEqual([1]); - expect(queue2.pull(1)).toEqual([]); - expect(discardedHelper.discarded).toEqual([]); - }); - - test('queued items: 2', () => { - discardedHelper.reset(); - queue2.push(1); - queue2.push(2); - expect(queue2.isEmpty()).toEqual(false); - expect(queue2.isFull()).toEqual(true); - expect(queue2.isCloseToFull()).toEqual(true); - expect(queue2.length).toEqual(2); - expect(queue2.maxLength).toEqual(2); - expect(queue2.pull(1)).toEqual([1]); - expect(queue2.pull(1)).toEqual([2]); - expect(queue2.pull(1)).toEqual([]); - expect(discardedHelper.discarded).toEqual([]); - }); - - test('queued items: 3', () => { - discardedHelper.reset(); - queue2.push(1); - queue2.push(2); - queue2.push(3); - expect(queue2.isEmpty()).toEqual(false); - expect(queue2.isFull()).toEqual(true); - expect(queue2.isCloseToFull()).toEqual(true); - expect(queue2.length).toEqual(2); - expect(queue2.maxLength).toEqual(2); - expect(queue2.pull(1)).toEqual([2]); - expect(queue2.pull(1)).toEqual([3]); - expect(queue2.pull(1)).toEqual([]); - expect(discardedHelper.discarded).toEqual([1]); - }); - - test('closeToFull()', () => { - discardedHelper.reset(); - - expect(queue10.isCloseToFull()).toEqual(false); - - for (let i = 1; i <= 8; i++) { - queue10.push(i); - expect(queue10.isCloseToFull()).toEqual(false); - } - - queue10.push(9); - expect(queue10.isCloseToFull()).toEqual(true); - - queue10.push(10); - expect(queue10.isCloseToFull()).toEqual(true); - - queue10.pull(2); - expect(queue10.isCloseToFull()).toEqual(false); - - queue10.push(11); - expect(queue10.isCloseToFull()).toEqual(true); - }); - - test('discarded', () => { - discardedHelper.reset(); - queue2.push(1); - queue2.push(2); - queue2.push(3); - expect(discardedHelper.discarded).toEqual([1]); - - discardedHelper.reset(); - queue2.push(4); - queue2.push(5); - expect(discardedHelper.discarded).toEqual([2, 3]); - }); - - test('pull', () => { - discardedHelper.reset(); - - expect(queue10.pull(4)).toEqual([]); - - for (let i = 1; i <= 10; i++) { - queue10.push(i); - } - - expect(queue10.pull(4)).toEqual([1, 2, 3, 4]); - expect(queue10.length).toEqual(6); - expect(queue10.pull(4)).toEqual([5, 6, 7, 8]); - expect(queue10.length).toEqual(2); - expect(queue10.pull(4)).toEqual([9, 10]); - expect(queue10.length).toEqual(0); - expect(queue10.pull(1)).toEqual([]); - expect(queue10.pull(4)).toEqual([]); - }); -}); - -class DiscardedHelper { - private _discarded: T[]; - - constructor() { - this.reset(); - this._discarded = []; - this.onDiscarded = this.onDiscarded.bind(this); - } - - onDiscarded(object: T) { - this._discarded.push(object); - } - - public get discarded(): T[] { - return this._discarded; - } - - reset() { - this._discarded = []; - } -} diff --git a/x-pack/plugins/event_log/server/lib/bounded_queue.ts b/x-pack/plugins/event_log/server/lib/bounded_queue.ts deleted file mode 100644 index 2c5ebcd38f5a8..0000000000000 --- a/x-pack/plugins/event_log/server/lib/bounded_queue.ts +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { Plugin } from '../plugin'; - -const CLOSE_TO_FULL_PERCENT = 0.9; - -type SystemLogger = Plugin['systemLogger']; - -export interface IBoundedQueue { - maxLength: number; - length: number; - push(object: T): void; - pull(count: number): T[]; - isEmpty(): boolean; - isFull(): boolean; - isCloseToFull(): boolean; -} - -export interface CreateBoundedQueueParams { - maxLength: number; - onDiscarded(object: T): void; - logger: SystemLogger; -} - -export function createBoundedQueue(params: CreateBoundedQueueParams): IBoundedQueue { - if (params.maxLength <= 0) throw new Error(`invalid bounded queue maxLength ${params.maxLength}`); - - return new BoundedQueue(params); -} - -class BoundedQueue implements IBoundedQueue { - private _maxLength: number; - private _buffer: T[]; - private _onDiscarded: (object: T) => void; - private _logger: SystemLogger; - - constructor(params: CreateBoundedQueueParams) { - this._maxLength = params.maxLength; - this._buffer = []; - this._onDiscarded = params.onDiscarded; - this._logger = params.logger; - } - - public get maxLength(): number { - return this._maxLength; - } - - public get length(): number { - return this._buffer.length; - } - - isEmpty() { - return this._buffer.length === 0; - } - - isFull() { - return this._buffer.length >= this._maxLength; - } - - isCloseToFull() { - return this._buffer.length / this._maxLength >= CLOSE_TO_FULL_PERCENT; - } - - push(object: T) { - this.ensureRoom(); - this._buffer.push(object); - } - - pull(count: number) { - if (count <= 0) throw new Error(`invalid pull count ${count}`); - - return this._buffer.splice(0, count); - } - - private ensureRoom() { - if (this.length < this._maxLength) return; - - const discarded = this.pull(this.length - this._maxLength + 1); - for (const object of discarded) { - try { - this._onDiscarded(object!); - } catch (err) { - this._logger.warn(`error discarding circular buffer entry: ${err.message}`); - } - } - } -} diff --git a/x-pack/plugins/event_log/server/lib/ready_signal.ts b/x-pack/plugins/event_log/server/lib/ready_signal.ts index 58879649b83cb..706f3e79cc279 100644 --- a/x-pack/plugins/event_log/server/lib/ready_signal.ts +++ b/x-pack/plugins/event_log/server/lib/ready_signal.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -export interface ReadySignal { +export interface ReadySignal { wait(): Promise; signal(value: T): void; } diff --git a/x-pack/plugins/event_log/server/plugin.test.ts b/x-pack/plugins/event_log/server/plugin.test.ts new file mode 100644 index 0000000000000..e32bda9089701 --- /dev/null +++ b/x-pack/plugins/event_log/server/plugin.test.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { CoreSetup, CoreStart } from 'src/core/server'; +import { coreMock } from 'src/core/server/mocks'; +import { IEventLogService } from './index'; +import { Plugin } from './plugin'; +import { spacesMock } from '../../spaces/server/mocks'; + +describe('event_log plugin', () => { + it('can setup and start', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const coreSetup = coreMock.createSetup() as CoreSetup; + const coreStart = coreMock.createStart() as CoreStart; + + const plugin = new Plugin(initializerContext); + const setup = await plugin.setup(coreSetup); + expect(typeof setup.getLogger).toBe('function'); + expect(typeof setup.getProviderActions).toBe('function'); + expect(typeof setup.isEnabled).toBe('function'); + expect(typeof setup.isIndexingEntries).toBe('function'); + expect(typeof setup.isLoggingEntries).toBe('function'); + expect(typeof setup.isProviderActionRegistered).toBe('function'); + expect(typeof setup.registerProviderActions).toBe('function'); + expect(typeof setup.registerSavedObjectProvider).toBe('function'); + + const spaces = spacesMock.createStart(); + const start = await plugin.start(coreStart, { spaces }); + expect(typeof start.getClient).toBe('function'); + }); + + it('can stop', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const mockLogger = initializerContext.logger.get(); + const coreSetup = coreMock.createSetup() as CoreSetup; + const coreStart = coreMock.createStart() as CoreStart; + + const plugin = new Plugin(initializerContext); + const spaces = spacesMock.createStart(); + await plugin.setup(coreSetup); + await plugin.start(coreStart, { spaces }); + await plugin.stop(); + expect(mockLogger.debug).toBeCalledWith('shutdown: waiting to finish'); + expect(mockLogger.debug).toBeCalledWith('shutdown: finished'); + }); +}); diff --git a/x-pack/plugins/event_log/server/plugin.ts b/x-pack/plugins/event_log/server/plugin.ts index f69850f166aee..d85de565b4d8e 100644 --- a/x-pack/plugins/event_log/server/plugin.ts +++ b/x-pack/plugins/event_log/server/plugin.ts @@ -115,6 +115,18 @@ export class Plugin implements CorePlugin { + if (!success) { + this.systemLogger.error(`initialization failed, events will not be indexed`); + } + }); + // will log the event after initialization this.eventLogger.logEvent({ event: { action: ACTIONS.starting }, @@ -134,18 +146,7 @@ export class Plugin implements CorePlugin, - 'eventLog' - > => { - return async (context, request) => { - return { - getEventLogClient: () => this.eventLogClientService!.getClient(request), - }; - }; - }; - - stop() { + async stop(): Promise { this.systemLogger.debug('stopping plugin'); if (!this.eventLogger) throw new Error('eventLogger not initialized'); @@ -156,5 +157,20 @@ export class Plugin implements CorePlugin, + 'eventLog' + > => { + return async (context, request) => { + return { + getEventLogClient: () => this.eventLogClientService!.getClient(request), + }; + }; + }; } diff --git a/x-pack/test/plugin_api_integration/plugins/event_log/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/event_log/server/init_routes.ts index 11af83631502b..95f3770443ccb 100644 --- a/x-pack/test/plugin_api_integration/plugins/event_log/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/event_log/server/init_routes.ts @@ -140,33 +140,6 @@ export const getProviderActionsRoute = ( ); }; -export const getLoggerRoute = ( - router: IRouter, - eventLogService: IEventLogService, - logger: Logger -) => { - router.get( - { - path: `/api/log_event_fixture/getEventLogger/{event}`, - validate: { - params: (value: any, { ok }: RouteValidationResultFactory) => ok(value), - }, - }, - async function ( - context: RequestHandlerContext, - req: KibanaRequest, - res: KibanaResponseFactory - ): Promise> { - const { event } = req.params as { event: string }; - logger.info(`test get event logger for event: ${event}`); - - return res.ok({ - body: { eventLogger: eventLogService.getLogger({ event: { provider: event } }) }, - }); - } - ); -}; - export const isIndexingEntriesRoute = ( router: IRouter, eventLogService: IEventLogService, diff --git a/x-pack/test/plugin_api_integration/plugins/event_log/server/plugin.ts b/x-pack/test/plugin_api_integration/plugins/event_log/server/plugin.ts index 4fb0511db2194..94e5e6faa2b43 100644 --- a/x-pack/test/plugin_api_integration/plugins/event_log/server/plugin.ts +++ b/x-pack/test/plugin_api_integration/plugins/event_log/server/plugin.ts @@ -11,7 +11,6 @@ import { registerProviderActionsRoute, isProviderActionRegisteredRoute, getProviderActionsRoute, - getLoggerRoute, isIndexingEntriesRoute, isEventLogServiceLoggingEntriesRoute, isEventLogServiceEnabledRoute, @@ -56,7 +55,6 @@ export class EventLogFixturePlugin registerProviderActionsRoute(router, eventLog, this.logger); isProviderActionRegisteredRoute(router, eventLog, this.logger); getProviderActionsRoute(router, eventLog, this.logger); - getLoggerRoute(router, eventLog, this.logger); isIndexingEntriesRoute(router, eventLog, this.logger); isEventLogServiceLoggingEntriesRoute(router, eventLog, this.logger); isEventLogServiceEnabledRoute(router, eventLog, this.logger); diff --git a/x-pack/test/plugin_api_integration/test_suites/event_log/service_api_integration.ts b/x-pack/test/plugin_api_integration/test_suites/event_log/service_api_integration.ts index 5f827dd3eded6..c246e2945a6dd 100644 --- a/x-pack/test/plugin_api_integration/test_suites/event_log/service_api_integration.ts +++ b/x-pack/test/plugin_api_integration/test_suites/event_log/service_api_integration.ts @@ -79,18 +79,6 @@ export default function ({ getService }: FtrProviderContext) { expect(providerActions.body.actions).to.be.eql(['action1', 'action2']); }); - it('should allow to get event logger event log service', async () => { - const initResult = await isProviderActionRegistered('provider2', 'action1'); - - if (!initResult.body.isProviderActionRegistered) { - await registerProviderActions('provider2', ['action1', 'action2']); - } - const eventLogger = await getEventLogger('provider2'); - expect(eventLogger.body.eventLogger.initialProperties).to.be.eql({ - event: { provider: 'provider2' }, - }); - }); - it('should allow write an event to index document if indexing entries is enabled', async () => { const initResult = await isProviderActionRegistered('provider4', 'action1'); @@ -138,14 +126,6 @@ export default function ({ getService }: FtrProviderContext) { .expect(200); } - async function getEventLogger(event: string) { - log.debug(`isProviderActionRegistered for event ${event}`); - return await supertest - .get(`/api/log_event_fixture/getEventLogger/${event}`) - .set('kbn-xsrf', 'foo') - .expect(200); - } - async function isIndexingEntries() { log.debug(`isIndexingEntries`); return await supertest