From 5ebf2ce417d34701989b4b28c5c117b51617df1b Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Thu, 22 Jul 2021 16:44:50 +0200 Subject: [PATCH 1/7] Add readable content stream --- .../server/lib/content_stream.test.ts | 77 +++++++++++++++++++ .../reporting/server/lib/content_stream.ts | 64 +++++++++++++++ x-pack/plugins/reporting/server/lib/index.ts | 1 + 3 files changed, 142 insertions(+) create mode 100644 x-pack/plugins/reporting/server/lib/content_stream.test.ts create mode 100644 x-pack/plugins/reporting/server/lib/content_stream.ts diff --git a/x-pack/plugins/reporting/server/lib/content_stream.test.ts b/x-pack/plugins/reporting/server/lib/content_stream.test.ts new file mode 100644 index 0000000000000..ab4e49c4d5d29 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/content_stream.test.ts @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { set } from 'lodash'; +import { ElasticsearchClient } from 'src/core/server'; +import { elasticsearchServiceMock } from 'src/core/server/mocks'; +import { ContentStream } from './content_stream'; + +describe('ContentStream', () => { + let client: jest.Mocked; + let stream: ContentStream; + + beforeEach(() => { + client = elasticsearchServiceMock.createClusterClient().asInternalUser; + stream = new ContentStream(client, { id: 'something', index: 'somewhere' }); + + client.search.mockResolvedValue( + set({}, 'body.hits.hits.0._source.output.content', 'some content') + ); + }); + + describe('read', () => { + it('should perform a search using index and the document id', async () => { + await new Promise((resolve) => stream.once('data', resolve)); + + expect(client.search).toHaveBeenCalledTimes(1); + + const [[request]] = client.search.mock.calls; + expect(request).toHaveProperty('index', 'somewhere'); + expect(request).toHaveProperty( + 'body.query.constant_score.filter.bool.must.0.term._id', + 'something' + ); + }); + + it('should read the document contents', async () => { + const data = await new Promise((resolve) => stream.once('data', resolve)); + + expect(data).toEqual(Buffer.from('some content')); + }); + + it('should be an empty stream on empty response', async () => { + client.search.mockResolvedValueOnce({ body: {} } as any); + const onData = jest.fn(); + + stream.on('data', onData); + await new Promise((resolve) => stream.once('end', resolve)); + + expect(onData).not.toHaveBeenCalled(); + }); + + it('should emit an error event', async () => { + client.search.mockRejectedValueOnce('some error'); + + stream.read(); + const error = await new Promise((resolve) => stream.once('error', resolve)); + + expect(error).toBe('some error'); + }); + }); + + describe('toString', () => { + it('should return the document contents', async () => { + await expect(stream.toString()).resolves.toBe('some content'); + }); + + it('should return an empty string for the empty document', async () => { + client.search.mockResolvedValueOnce({ body: {} } as any); + + await expect(stream.toString()).resolves.toBe(''); + }); + }); +}); diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts new file mode 100644 index 0000000000000..2815455ffe16d --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Readable } from 'stream'; +import type { ElasticsearchClient } from 'src/core/server'; +import { ReportDocument } from '../../common/types'; + +type SearchRequest = Required>[0]; + +export interface ContentStreamDocument { + id: string; + index: string; +} + +export class ContentStream extends Readable { + constructor(private client: ElasticsearchClient, private document: ContentStreamDocument) { + super(); + } + + async _read() { + const { id, index } = this.document; + const body: SearchRequest['body'] = { + _source: { includes: ['output.content'] }, + query: { + constant_score: { + filter: { + bool: { + must: [{ term: { _id: id } }], + }, + }, + }, + }, + size: 1, + }; + + try { + const response = await this.client.search({ body, index }); + const hits = response?.body.hits?.hits?.[0] as ReportDocument | undefined; + const output = hits?._source.output?.content; + + if (output != null) { + this.push(output); + } + + this.push(null); + } catch (error) { + this.destroy(error); + } + } + + async toString(): Promise { + let result = ''; + + for await (const chunk of this) { + result += chunk; + } + + return result; + } +} diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index 37f57d97d3d4c..07e8682500014 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -7,6 +7,7 @@ export { checkLicense } from './check_license'; export { checkParamsVersion } from './check_params_version'; +export { ContentStream } from './content_stream'; export { cryptoFactory } from './crypto'; export { ExportTypesRegistry, getExportTypesRegistry } from './export_types_registry'; export { LevelLogger } from './level_logger'; From 57649190b73e214f60a61b0b70de12b497407885 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Thu, 22 Jul 2021 16:48:33 +0200 Subject: [PATCH 2/7] Add content stream factory --- .../server/lib/content_stream_factory.ts | 21 +++++++++++++++++++ x-pack/plugins/reporting/server/lib/index.ts | 1 + 2 files changed, 22 insertions(+) create mode 100644 x-pack/plugins/reporting/server/lib/content_stream_factory.ts diff --git a/x-pack/plugins/reporting/server/lib/content_stream_factory.ts b/x-pack/plugins/reporting/server/lib/content_stream_factory.ts new file mode 100644 index 0000000000000..aeb58ce51500a --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/content_stream_factory.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ReportingCore } from '..'; +import { ContentStream, ContentStreamDocument } from './content_stream'; + +export function getContentStreamFactory(reporting: ReportingCore) { + async function getClient() { + const { asInternalUser: client } = await reporting.getEsClient(); + + return client; + } + + return async function getContentStream(document: ContentStreamDocument) { + return new ContentStream(await getClient(), document); + }; +} diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index 07e8682500014..effd74d9c0379 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -8,6 +8,7 @@ export { checkLicense } from './check_license'; export { checkParamsVersion } from './check_params_version'; export { ContentStream } from './content_stream'; +export { getContentStreamFactory } from './content_stream_factory'; export { cryptoFactory } from './crypto'; export { ExportTypesRegistry, getExportTypesRegistry } from './export_types_registry'; export { LevelLogger } from './level_logger'; From f71bf3815fef0af435d31aceca20ca8430d302d7 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Thu, 22 Jul 2021 17:02:33 +0200 Subject: [PATCH 3/7] Move report contents gathering to the content stream --- x-pack/plugins/reporting/common/types.ts | 6 -- .../reporting/server/routes/jobs.test.ts | 20 ++++--- .../server/routes/lib/get_document_payload.ts | 43 +++++++++----- .../server/routes/lib/job_response_handler.ts | 7 +-- .../reporting/server/routes/lib/jobs_query.ts | 57 +------------------ 5 files changed, 46 insertions(+), 87 deletions(-) diff --git a/x-pack/plugins/reporting/common/types.ts b/x-pack/plugins/reporting/common/types.ts index f3a0e9192cf7d..ba200ccd1ffab 100644 --- a/x-pack/plugins/reporting/common/types.ts +++ b/x-pack/plugins/reporting/common/types.ts @@ -120,12 +120,6 @@ export type JobStatus = | 'processing' // Report job has been claimed and is executing | 'failed'; // Report was not successful, and all retries are done. Nothing to download. -// payload for retrieving the error message of a failed job -export interface JobContent { - content: TaskRunResult['content']; - content_type: false; -} - /* * Info API response: to avoid unnecessary large payloads on a network, the * report query results do not include `payload.headers` or `output.content`, diff --git a/x-pack/plugins/reporting/server/routes/jobs.test.ts b/x-pack/plugins/reporting/server/routes/jobs.test.ts index 3040ea351f7d0..a3c1995fee1dc 100644 --- a/x-pack/plugins/reporting/server/routes/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/jobs.test.ts @@ -5,6 +5,10 @@ * 2.0. */ +jest.mock('../lib/content_stream_factory', () => ({ + getContentStreamFactory: jest.fn(), +})); + import { UnwrapPromise } from '@kbn/utility-types'; import type { DeeplyMockedKeys } from '@kbn/utility-types/jest'; import { of } from 'rxjs'; @@ -13,7 +17,7 @@ import { setupServer } from 'src/core/server/test_utils'; import supertest from 'supertest'; import { ReportingCore } from '..'; import { ReportingInternalSetup } from '../core'; -import { ExportTypesRegistry } from '../lib/export_types_registry'; +import { ContentStream, ExportTypesRegistry, getContentStreamFactory } from '../lib'; import { createMockConfigSchema, createMockPluginSetup, @@ -32,6 +36,7 @@ describe('GET /api/reporting/jobs/download', () => { let core: ReportingCore; let mockSetupDeps: ReportingInternalSetup; let mockEsClient: DeeplyMockedKeys; + let stream: jest.Mocked; const getHits = (...sources: any) => { return { @@ -93,6 +98,11 @@ describe('GET /api/reporting/jobs/download', () => { core.getExportTypesRegistry = () => exportTypesRegistry; mockEsClient = (await core.getEsClient()).asInternalUser as typeof mockEsClient; + stream = ({ toString: jest.fn(() => 'test') } as unknown) as typeof stream; + + (getContentStreamFactory as jest.MockedFunction< + typeof getContentStreamFactory + >).mockReturnValue(async () => stream); }); afterEach(async () => { @@ -188,10 +198,10 @@ describe('GET /api/reporting/jobs/download', () => { body: getHits({ jobtype: 'unencodedJobType', status: 'failed', - output: { content: 'job failure message' }, payload: { title: 'failing job!' }, }), } as any); + stream.toString.mockResolvedValueOnce('job failure message'); registerJobInfoRoutes(core); await server.start(); @@ -207,14 +217,13 @@ describe('GET /api/reporting/jobs/download', () => { describe('successful downloads', () => { const getCompleteHits = ({ jobType = 'unencodedJobType', - outputContent = 'job output content', outputContentType = 'text/plain', title = '', } = {}) => { return getHits({ jobtype: jobType, status: 'completed', - output: { content: outputContent, content_type: outputContentType }, + output: { content_type: outputContentType }, payload: { title }, }); }; @@ -252,7 +261,6 @@ describe('GET /api/reporting/jobs/download', () => { mockEsClient.search.mockResolvedValueOnce({ body: getCompleteHits({ jobType: 'unencodedJobType', - outputContent: 'test', }), } as any); registerJobInfoRoutes(core); @@ -269,7 +277,6 @@ describe('GET /api/reporting/jobs/download', () => { mockEsClient.search.mockResolvedValueOnce({ body: getCompleteHits({ jobType: 'base64EncodedJobType', - outputContent: 'test', outputContentType: 'application/pdf', }), } as any); @@ -288,7 +295,6 @@ describe('GET /api/reporting/jobs/download', () => { mockEsClient.search.mockResolvedValueOnce({ body: getCompleteHits({ jobType: 'unencodedJobType', - outputContent: 'alert("all your base mine now");', outputContentType: 'application/html', }), } as any); diff --git a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts index 2141252c70bfa..9c37589ebef46 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts @@ -8,10 +8,10 @@ // @ts-ignore import contentDisposition from 'content-disposition'; import { CSV_JOB_TYPE, CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants'; -import { ExportTypesRegistry, statuses } from '../../lib'; -import { TaskRunResult } from '../../lib/tasks'; +import { ReportApiJSON } from '../../../common/types'; +import { ReportingCore } from '../../'; +import { getContentStreamFactory, statuses } from '../../lib'; import { ExportTypeDefinition } from '../../types'; -import { ReportContent } from './jobs_query'; export interface ErrorFromPayload { message: string; @@ -25,6 +25,8 @@ interface Payload { headers: Record; } +type TaskRunResult = Required['output']; + const DEFAULT_TITLE = 'report'; const getTitle = (exportType: ExportTypeDefinition, title?: string): string => @@ -44,7 +46,10 @@ const getReportingHeaders = (output: TaskRunResult, exportType: ExportTypeDefini return metaDataHeaders; }; -export function getDocumentPayloadFactory(exportTypesRegistry: ExportTypesRegistry) { +export function getDocumentPayloadFactory(reporting: ReportingCore) { + const exportTypesRegistry = reporting.getExportTypesRegistry(); + const getContentStream = getContentStreamFactory(reporting); + function encodeContent( content: string | null, exportType: ExportTypeDefinition @@ -57,7 +62,12 @@ export function getDocumentPayloadFactory(exportTypesRegistry: ExportTypesRegist } } - function getCompleted(output: TaskRunResult, jobType: string, title: string): Payload { + async function getCompleted( + output: TaskRunResult, + jobType: string, + title: string, + content: string + ): Promise { const exportType = exportTypesRegistry.get( (item: ExportTypeDefinition) => item.jobType === jobType ); @@ -66,7 +76,7 @@ export function getDocumentPayloadFactory(exportTypesRegistry: ExportTypesRegist return { statusCode: 200, - content: encodeContent(output.content, exportType), + content: encodeContent(content, exportType), contentType: output.content_type, headers: { ...headers, @@ -77,11 +87,11 @@ export function getDocumentPayloadFactory(exportTypesRegistry: ExportTypesRegist // @TODO: These should be semantic HTTP codes as 500/503's indicate // error then these are really operating properly. - function getFailure(output: TaskRunResult): Payload { + function getFailure(content: string): Payload { return { statusCode: 500, content: { - message: `Reporting generation failed: ${output.content}`, + message: `Reporting generation failed: ${content}`, }, contentType: 'application/json', headers: {}, @@ -97,19 +107,24 @@ export function getDocumentPayloadFactory(exportTypesRegistry: ExportTypesRegist }; } - return function getDocumentPayload({ + return async function getDocumentPayload({ + id, + index, + output, status, jobtype: jobType, - payload: { title } = { title: 'unknown' }, - output, - }: ReportContent): Payload { + payload: { title }, + }: ReportApiJSON): Promise { if (output) { + const stream = await getContentStream({ id, index }); + const content = await stream.toString(); + if (status === statuses.JOB_STATUS_COMPLETED || status === statuses.JOB_STATUS_WARNINGS) { - return getCompleted(output, jobType, title); + return getCompleted(output, jobType, title, content); } if (status === statuses.JOB_STATUS_FAILED) { - return getFailure(output); + return getFailure(content); } } diff --git a/x-pack/plugins/reporting/server/routes/lib/job_response_handler.ts b/x-pack/plugins/reporting/server/routes/lib/job_response_handler.ts index f9519f74060f9..9f90be09bccbc 100644 --- a/x-pack/plugins/reporting/server/routes/lib/job_response_handler.ts +++ b/x-pack/plugins/reporting/server/routes/lib/job_response_handler.ts @@ -22,8 +22,7 @@ interface JobResponseHandlerOpts { export function downloadJobResponseHandlerFactory(reporting: ReportingCore) { const jobsQuery = jobsQueryFactory(reporting); - const exportTypesRegistry = reporting.getExportTypesRegistry(); - const getDocumentPayload = getDocumentPayloadFactory(exportTypesRegistry); + const getDocumentPayload = getDocumentPayloadFactory(reporting); return async function jobResponseHandler( res: typeof kibanaResponseFactory, @@ -35,7 +34,7 @@ export function downloadJobResponseHandlerFactory(reporting: ReportingCore) { try { const { docId } = params; - const doc = await jobsQuery.getContent(user, docId); + const doc = await jobsQuery.get(user, docId); if (!doc) { return res.notFound(); } @@ -46,7 +45,7 @@ export function downloadJobResponseHandlerFactory(reporting: ReportingCore) { }); } - const payload = getDocumentPayload(doc); + const payload = await getDocumentPayload(doc); if (!payload.contentType || !ALLOWED_JOB_CONTENT_TYPES.includes(payload.contentType)) { return res.badRequest({ diff --git a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts index a0d6962074a70..3e33124b1ed0f 100644 --- a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts +++ b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts @@ -12,8 +12,7 @@ import { i18n } from '@kbn/i18n'; import { UnwrapPromise } from '@kbn/utility-types'; import { ElasticsearchClient } from 'src/core/server'; import { ReportingCore } from '../../'; -import { JobContent, ReportApiJSON, ReportDocument, ReportSource } from '../../../common/types'; -import { statuses } from '../../lib/statuses'; +import { ReportApiJSON, ReportSource } from '../../../common/types'; import { Report } from '../../lib/store'; import { ReportingUser } from '../../types'; @@ -47,8 +46,6 @@ interface JobsQueryFactory { ): Promise; count(jobTypes: string[], user: ReportingUser): Promise; get(user: ReportingUser, id: string): Promise; - getContent(user: ReportingUser, id: string): Promise; - getError(user: ReportingUser, id: string): Promise<(ReportContent & JobContent) | void>; delete(deleteIndex: string, id: string): Promise>; } @@ -169,58 +166,6 @@ export function jobsQueryFactory(reportingCore: ReportingCore): JobsQueryFactory return report.toApiJSON(); }, - async getContent(user, id) { - if (!id) { - return; - } - - const username = getUsername(user); - const body: SearchRequest['body'] = { - _source: { excludes: ['payload.headers'] }, - query: { - constant_score: { - filter: { - bool: { - must: [{ term: { _id: id } }, { term: { created_by: username } }], - }, - }, - }, - }, - size: 1, - }; - - const response = await execQuery((elasticsearchClient) => - elasticsearchClient.search({ body, index: getIndex() }) - ); - - if (response?.body.hits?.hits?.length !== 1) { - return; - } - - const report = response.body.hits.hits[0] as ReportDocument; - - return { - status: report._source.status, - jobtype: report._source.jobtype, - output: report._source.output, - payload: report._source.payload, - }; - }, - - async getError(user, id) { - const content = await this.getContent(user, id); - if (content && content?.output?.content) { - if (content.status !== statuses.JOB_STATUS_FAILED) { - throw new Error(`Can not get error for ${id}`); - } - return { - ...content, - content: content.output.content, - content_type: false, - }; - } - }, - async delete(deleteIndex, id) { try { const { asInternalUser: elasticsearchClient } = await reportingCore.getEsClient(); From 46ab6ddb25ab52ea86038084f6c563eb29aa1f16 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Mon, 26 Jul 2021 16:48:12 +0200 Subject: [PATCH 4/7] Add writeable content stream --- .../server/lib/content_stream.test.ts | 47 +++++++++++++++++++ .../reporting/server/lib/content_stream.ts | 45 +++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/content_stream.test.ts b/x-pack/plugins/reporting/server/lib/content_stream.test.ts index ab4e49c4d5d29..793fcc9c8a425 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.test.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.test.ts @@ -74,4 +74,51 @@ describe('ContentStream', () => { await expect(stream.toString()).resolves.toBe(''); }); }); + + describe('write', () => { + it('should not send a request until stream is closed', () => { + stream.write('something'); + + expect(client.update).not.toHaveBeenCalled(); + }); + + it('should send the contents when stream ends', async () => { + stream.write('123'); + stream.write('456'); + stream.end(); + await new Promise((resolve) => stream.once('finish', resolve)); + + expect(client.update).toHaveBeenCalledTimes(1); + + const [[request]] = client.update.mock.calls; + + expect(request).toHaveProperty('id', 'something'); + expect(request).toHaveProperty('index', 'somewhere'); + expect(request).toHaveProperty('body.doc.output.content', '123456'); + }); + + it('should emit an error event', async () => { + client.update.mockRejectedValueOnce('some error'); + + stream.end('data'); + const error = await new Promise((resolve) => stream.once('error', resolve)); + + expect(error).toBe('some error'); + }); + + it('should update _seq_no and _primary_term from the response', async () => { + client.update.mockResolvedValueOnce({ + body: { + _primary_term: 1, + _seq_no: 10, + }, + } as any); + + stream.end('something'); + await new Promise((resolve) => stream.once('finish', resolve)); + + expect(stream.getPrimaryTerm()).toBe(1); + expect(stream.getSeqNo()).toBe(10); + }); + }); }); diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 2815455ffe16d..509e7803e9eaf 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -5,18 +5,25 @@ * 2.0. */ -import { Readable } from 'stream'; +import { Duplex } from 'stream'; import type { ElasticsearchClient } from 'src/core/server'; import { ReportDocument } from '../../common/types'; +type Callback = (error?: Error) => void; type SearchRequest = Required>[0]; export interface ContentStreamDocument { id: string; index: string; + if_primary_term?: number; + if_seq_no?: number; } -export class ContentStream extends Readable { +export class ContentStream extends Duplex { + private buffer = ''; + private primaryTerm?: number; + private seqNo?: number; + constructor(private client: ElasticsearchClient, private document: ContentStreamDocument) { super(); } @@ -52,6 +59,32 @@ export class ContentStream extends Readable { } } + _write(chunk: Buffer | string, _encoding: string, callback: Callback) { + this.buffer += typeof chunk === 'string' ? chunk : chunk.toString(); + callback(); + } + + async _final(callback: Callback) { + try { + const { body } = await this.client.update({ + ...this.document, + body: { + doc: { + output: { + content: this.buffer, + }, + }, + }, + }); + + ({ _primary_term: this.primaryTerm, _seq_no: this.seqNo } = body); + this.buffer = ''; + callback(); + } catch (error) { + callback(error); + } + } + async toString(): Promise { let result = ''; @@ -61,4 +94,12 @@ export class ContentStream extends Readable { return result; } + + getSeqNo(): number | undefined { + return this.seqNo; + } + + getPrimaryTerm(): number | undefined { + return this.primaryTerm; + } } From 23f4e292783b47334790404190dfd7ad8655a28a Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Wed, 28 Jul 2021 20:56:12 +0200 Subject: [PATCH 5/7] Move report contents writing to the content stream --- .../reporting/server/lib/store/store.ts | 6 +++--- .../server/lib/tasks/execute_report.ts | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index 40a73f294c5a9..c8c5f82b59039 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -9,7 +9,7 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types' import { ElasticsearchClient } from 'src/core/server'; import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; -import { JobStatus } from '../../../common/types'; +import { JobStatus, TaskRunResult } from '../../../common/types'; import { ILM_POLICY_NAME } from '../../../common/constants'; @@ -41,7 +41,7 @@ export type ReportFailedFields = Required<{ export type ReportCompletedFields = Required<{ completed_at: Report['completed_at']; - output: Report['output']; + output: Omit | null; }>; /* @@ -354,7 +354,7 @@ export class ReportingStore { const doc = sourceDoc({ ...completedInfo, status, - }); + } as ReportSource); try { checkReportIsEditable(report); diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index a52f452436d6d..1fb07547fa903 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -5,11 +5,13 @@ * 2.0. */ +import { Readable, pipeline } from 'stream'; +import { promisify } from 'util'; import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; import * as Rx from 'rxjs'; import { timeout } from 'rxjs/operators'; -import { LevelLogger } from '../'; +import { LevelLogger, getContentStreamFactory } from '../'; import { ReportingCore } from '../../'; import { RunContext, @@ -55,6 +57,7 @@ export class ExecuteReportTask implements ReportingTask { private kibanaId?: string; private kibanaName?: string; private store?: ReportingStore; + private getContentStream: ReturnType; constructor( private reporting: ReportingCore, @@ -62,6 +65,7 @@ export class ExecuteReportTask implements ReportingTask { logger: LevelLogger ) { this.logger = logger.clone(['runTask']); + this.getContentStream = getContentStreamFactory(reporting); } /* @@ -251,7 +255,17 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); const completedTime = moment().toISOString(); - const docOutput = this._formatOutput(output); + const { content, ...docOutput } = this._formatOutput(output); + const stream = await this.getContentStream({ + id: report._id, + index: report._index!, + if_primary_term: report._primary_term, + if_seq_no: report._seq_no, + }); + + await promisify(pipeline)(Readable.from(content ?? ''), stream); + report._seq_no = stream.getSeqNo(); + report._primary_term = stream.getPrimaryTerm(); const store = await this.getStore(); const doc = { From 48b2937d3f1954003605bfcba9810de59b212088 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Fri, 30 Jul 2021 20:48:47 +0200 Subject: [PATCH 6/7] Refactor content stream factory to get rid of currying --- .../reporting/server/lib/content_stream.ts | 9 +++++++- .../server/lib/content_stream_factory.ts | 21 ------------------- x-pack/plugins/reporting/server/lib/index.ts | 3 +-- .../server/lib/tasks/execute_report.ts | 6 ++---- .../reporting/server/routes/jobs.test.ts | 10 ++++----- .../server/routes/lib/get_document_payload.ts | 5 ++--- 6 files changed, 17 insertions(+), 37 deletions(-) delete mode 100644 x-pack/plugins/reporting/server/lib/content_stream_factory.ts diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 509e7803e9eaf..5f95394abfb57 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -7,12 +7,13 @@ import { Duplex } from 'stream'; import type { ElasticsearchClient } from 'src/core/server'; +import { ReportingCore } from '..'; import { ReportDocument } from '../../common/types'; type Callback = (error?: Error) => void; type SearchRequest = Required>[0]; -export interface ContentStreamDocument { +interface ContentStreamDocument { id: string; index: string; if_primary_term?: number; @@ -103,3 +104,9 @@ export class ContentStream extends Duplex { return this.primaryTerm; } } + +export async function getContentStream(reporting: ReportingCore, document: ContentStreamDocument) { + const { asInternalUser: client } = await reporting.getEsClient(); + + return new ContentStream(client, document); +} diff --git a/x-pack/plugins/reporting/server/lib/content_stream_factory.ts b/x-pack/plugins/reporting/server/lib/content_stream_factory.ts deleted file mode 100644 index aeb58ce51500a..0000000000000 --- a/x-pack/plugins/reporting/server/lib/content_stream_factory.ts +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { ReportingCore } from '..'; -import { ContentStream, ContentStreamDocument } from './content_stream'; - -export function getContentStreamFactory(reporting: ReportingCore) { - async function getClient() { - const { asInternalUser: client } = await reporting.getEsClient(); - - return client; - } - - return async function getContentStream(document: ContentStreamDocument) { - return new ContentStream(await getClient(), document); - }; -} diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index effd74d9c0379..cc61f5eb96616 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -7,8 +7,7 @@ export { checkLicense } from './check_license'; export { checkParamsVersion } from './check_params_version'; -export { ContentStream } from './content_stream'; -export { getContentStreamFactory } from './content_stream_factory'; +export { ContentStream, getContentStream } from './content_stream'; export { cryptoFactory } from './crypto'; export { ExportTypesRegistry, getExportTypesRegistry } from './export_types_registry'; export { LevelLogger } from './level_logger'; diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index 1fb07547fa903..b0c8978a02f6a 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -11,7 +11,7 @@ import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; import * as Rx from 'rxjs'; import { timeout } from 'rxjs/operators'; -import { LevelLogger, getContentStreamFactory } from '../'; +import { LevelLogger, getContentStream } from '../'; import { ReportingCore } from '../../'; import { RunContext, @@ -57,7 +57,6 @@ export class ExecuteReportTask implements ReportingTask { private kibanaId?: string; private kibanaName?: string; private store?: ReportingStore; - private getContentStream: ReturnType; constructor( private reporting: ReportingCore, @@ -65,7 +64,6 @@ export class ExecuteReportTask implements ReportingTask { logger: LevelLogger ) { this.logger = logger.clone(['runTask']); - this.getContentStream = getContentStreamFactory(reporting); } /* @@ -256,7 +254,7 @@ export class ExecuteReportTask implements ReportingTask { const completedTime = moment().toISOString(); const { content, ...docOutput } = this._formatOutput(output); - const stream = await this.getContentStream({ + const stream = await getContentStream(this.reporting, { id: report._id, index: report._index!, if_primary_term: report._primary_term, diff --git a/x-pack/plugins/reporting/server/routes/jobs.test.ts b/x-pack/plugins/reporting/server/routes/jobs.test.ts index a3c1995fee1dc..b666f0a2f394d 100644 --- a/x-pack/plugins/reporting/server/routes/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/jobs.test.ts @@ -5,8 +5,8 @@ * 2.0. */ -jest.mock('../lib/content_stream_factory', () => ({ - getContentStreamFactory: jest.fn(), +jest.mock('../lib/content_stream', () => ({ + getContentStream: jest.fn(), })); import { UnwrapPromise } from '@kbn/utility-types'; @@ -17,7 +17,7 @@ import { setupServer } from 'src/core/server/test_utils'; import supertest from 'supertest'; import { ReportingCore } from '..'; import { ReportingInternalSetup } from '../core'; -import { ContentStream, ExportTypesRegistry, getContentStreamFactory } from '../lib'; +import { ContentStream, ExportTypesRegistry, getContentStream } from '../lib'; import { createMockConfigSchema, createMockPluginSetup, @@ -100,9 +100,7 @@ describe('GET /api/reporting/jobs/download', () => { mockEsClient = (await core.getEsClient()).asInternalUser as typeof mockEsClient; stream = ({ toString: jest.fn(() => 'test') } as unknown) as typeof stream; - (getContentStreamFactory as jest.MockedFunction< - typeof getContentStreamFactory - >).mockReturnValue(async () => stream); + (getContentStream as jest.MockedFunction).mockResolvedValue(stream); }); afterEach(async () => { diff --git a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts index 9c37589ebef46..869eb93bd3b42 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts @@ -10,7 +10,7 @@ import contentDisposition from 'content-disposition'; import { CSV_JOB_TYPE, CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants'; import { ReportApiJSON } from '../../../common/types'; import { ReportingCore } from '../../'; -import { getContentStreamFactory, statuses } from '../../lib'; +import { getContentStream, statuses } from '../../lib'; import { ExportTypeDefinition } from '../../types'; export interface ErrorFromPayload { @@ -48,7 +48,6 @@ const getReportingHeaders = (output: TaskRunResult, exportType: ExportTypeDefini export function getDocumentPayloadFactory(reporting: ReportingCore) { const exportTypesRegistry = reporting.getExportTypesRegistry(); - const getContentStream = getContentStreamFactory(reporting); function encodeContent( content: string | null, @@ -116,7 +115,7 @@ export function getDocumentPayloadFactory(reporting: ReportingCore) { payload: { title }, }: ReportApiJSON): Promise { if (output) { - const stream = await getContentStream({ id, index }); + const stream = await getContentStream(reporting, { id, index }); const content = await stream.toString(); if (status === statuses.JOB_STATUS_COMPLETED || status === statuses.JOB_STATUS_WARNINGS) { From 7e3a4491eeaf8567b55947b9da85201db6c34d39 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Tue, 3 Aug 2021 23:10:27 +0200 Subject: [PATCH 7/7] Update jobs executors to use content stream instead of returning report contents --- x-pack/plugins/reporting/common/types.ts | 8 +- .../export_types/csv/execute_job.test.ts | 134 +++++++++++------- .../server/export_types/csv/execute_job.ts | 8 +- .../export_types/csv/generate_csv/index.ts | 12 +- .../server/export_types/csv/types.d.ts | 1 - .../csv_searchsource/execute_job.test.ts | 16 ++- .../csv_searchsource/execute_job.ts | 12 +- .../generate_csv/generate_csv.test.ts | 81 +++++++---- .../generate_csv/generate_csv.ts | 7 +- .../max_size_string_builder.test.ts | 71 +++++----- .../generate_csv/max_size_string_builder.ts | 43 +++--- .../csv_searchsource_immediate/execute_job.ts | 14 +- .../png/execute_job/index.test.ts | 17 ++- .../export_types/png/execute_job/index.ts | 6 +- .../printable_pdf/execute_job/index.test.ts | 17 ++- .../printable_pdf/execute_job/index.ts | 14 +- .../reporting/server/lib/store/store.ts | 6 +- .../server/lib/tasks/execute_report.ts | 45 +++--- .../routes/csv_searchsource_immediate.ts | 17 ++- x-pack/plugins/reporting/server/types.ts | 4 +- 20 files changed, 322 insertions(+), 211 deletions(-) diff --git a/x-pack/plugins/reporting/common/types.ts b/x-pack/plugins/reporting/common/types.ts index ba200ccd1ffab..7bcf69b564b3c 100644 --- a/x-pack/plugins/reporting/common/types.ts +++ b/x-pack/plugins/reporting/common/types.ts @@ -44,7 +44,7 @@ export interface ReportDocumentHead { _primary_term: number; } -export interface TaskRunResult { +export interface ReportOutput { content_type: string | null; content: string | null; size: number; @@ -53,6 +53,8 @@ export interface TaskRunResult { warnings?: string[]; } +export type TaskRunResult = Omit; + export interface ReportSource { /* * Required fields: populated in enqueue_job when the request comes in to @@ -73,7 +75,7 @@ export interface ReportSource { /* * `output` is only populated if the report job is completed or failed. */ - output: TaskRunResult | null; + output: ReportOutput | null; /* * Optional fields: populated when the job is claimed to execute, and after @@ -127,7 +129,7 @@ export type JobStatus = */ interface ReportSimple extends Omit { payload: Omit; - output?: Omit; // is undefined for report jobs that are not completed + output?: Omit; // is undefined for report jobs that are not completed } /* diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts index 0e8a7016b853b..65b53b3b77eb4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import type { DeeplyMockedKeys } from '@kbn/utility-types/jest'; import nodeCrypto from '@elastic/node-crypto'; import { ElasticsearchClient, IUiSettingsClient } from 'kibana/server'; @@ -56,6 +57,8 @@ describe('CSV Execute Job', function () { let mockReportingConfig: ReportingConfig; let mockReportingCore: ReportingCore; let cancellationToken: any; + let stream: jest.Mocked; + let content: string; const mockUiSettingsClient = { get: sinon.stub(), @@ -67,6 +70,8 @@ describe('CSV Execute Job', function () { }); beforeEach(async function () { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; configGetStub = sinon.stub(); configGetStub.withArgs('queue', 'timeout').returns(moment.duration('2m')); configGetStub.withArgs('index').returns('.reporting-foo-test'); @@ -124,7 +129,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); }); @@ -145,7 +151,7 @@ describe('CSV Execute Job', function () { }, }); - await runTask('job777', job, cancellationToken); + await runTask('job777', job, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith(expect.objectContaining({ body, index })); }); @@ -171,7 +177,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.scroll).toHaveBeenCalledWith( @@ -188,7 +195,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); @@ -221,7 +229,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); @@ -257,7 +266,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.clearScroll).toHaveBeenCalledWith( @@ -290,9 +300,9 @@ describe('CSV Execute Job', function () { conflictedTypesFields: undefined, searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[TypeError: Cannot read property 'indexOf' of undefined]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[TypeError: Cannot read property 'indexOf' of undefined]`); expect(mockEsClient.clearScroll).toHaveBeenCalledWith( expect.objectContaining({ body: { scroll_id: lastScrollId } }) @@ -322,7 +332,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(true); @@ -349,7 +360,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(true); @@ -377,7 +389,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -406,7 +419,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -433,7 +447,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -459,7 +474,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual(`${CSV_BOM_CHARS}one,two\none,bar\n`); }); @@ -482,7 +497,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual('one,two\none,bar\n'); }); @@ -507,7 +522,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual("one,two\n\"'=cmd|' /C calc'!A0\",bar\n"); }); @@ -530,7 +545,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual('one,two\n"=cmd|\' /C calc\'!A0",bar\n'); }); @@ -545,9 +560,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[Error]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[Error]`); }); it('should reject Promise if scroll call errors out', async function () { @@ -566,9 +581,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[Error]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[Error]`); }); }); @@ -589,7 +604,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[{}]}}]` ); }); @@ -610,7 +627,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[]}}]` ); }); @@ -640,7 +659,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[{}]}}]` ); }); @@ -670,7 +691,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[]}}]` ); }); @@ -705,7 +728,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); await delay(250); @@ -729,7 +753,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); cancellationToken.cancel(); @@ -745,7 +770,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); await delay(100); cancellationToken.cancel(); @@ -767,7 +793,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one,two\n`); }); @@ -779,7 +805,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one;two\n`); }); @@ -791,7 +817,7 @@ describe('CSV Execute Job', function () { fields: ['one and a half', 'two', 'three-and-four', 'five & six'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`"one and a half",two,"three-and-four","five & six"\n`); }); @@ -803,7 +829,7 @@ describe('CSV Execute Job', function () { fields: ['one and a half', 'two', 'three-and-four', 'five & six'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one and a half,two,three-and-four,five & six\n`); }); @@ -823,7 +849,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); const headerLine = lines[0]; @@ -847,7 +873,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); const valuesLine = lines[1]; @@ -879,7 +905,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); @@ -913,7 +939,7 @@ describe('CSV Execute Job', function () { }, }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); @@ -927,7 +953,6 @@ describe('CSV Execute Job', function () { // tests use these 'simple' characters to make the math easier describe('when only the headers exceed the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -940,10 +965,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -957,7 +983,6 @@ describe('CSV Execute Job', function () { }); describe('when headers are equal to maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -970,10 +995,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -987,7 +1013,6 @@ describe('CSV Execute Job', function () { }); describe('when the data exceeds the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -1010,10 +1035,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -1027,7 +1053,6 @@ describe('CSV Execute Job', function () { }); describe('when headers and data equal the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -1052,10 +1077,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -1091,7 +1117,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith( expect.objectContaining({ scroll: scrollDuration }) @@ -1119,7 +1145,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith( expect.objectContaining({ size: scrollSize }) @@ -1147,7 +1173,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.scroll).toHaveBeenCalledWith( expect.objectContaining({ body: { scroll: scrollDuration, scroll_id: 'scrollId' } }) diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts index 57559d136ff3e..322cde38d7fd6 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts @@ -16,7 +16,7 @@ export const runTaskFnFactory: RunTaskFnFactory< > = function executeJobFactoryFn(reporting, parentLogger) { const config = reporting.getConfig(); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const elasticsearch = await reporting.getEsClient(); const logger = parentLogger.clone([jobId]); const generateCsv = createGenerateCsv(logger); @@ -27,18 +27,18 @@ export const runTaskFnFactory: RunTaskFnFactory< const uiSettingsClient = await reporting.getUiSettingsClient(fakeRequest, logger); const { asCurrentUser: elasticsearchClient } = elasticsearch.asScoped(fakeRequest); - const { content, maxSizeReached, size, csvContainsFormulas, warnings } = await generateCsv( + const { maxSizeReached, size, csvContainsFormulas, warnings } = await generateCsv( job, config, uiSettingsClient, elasticsearchClient, - cancellationToken + cancellationToken, + stream ); // @TODO: Consolidate these one-off warnings into the warnings array (max-size reached and csv contains formulas) return { content_type: CONTENT_TYPE_CSV, - content, max_size_reached: maxSizeReached, size, csv_contains_formulas: csvContainsFormulas, diff --git a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts index c1018030827c0..9dfc32b90e8c5 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { i18n } from '@kbn/i18n'; import { ElasticsearchClient, IUiSettingsClient } from 'src/core/server'; import { ReportingConfig } from '../../../'; @@ -57,12 +58,17 @@ export function createGenerateCsv(logger: LevelLogger) { config: ReportingConfig, uiSettingsClient: IUiSettingsClient, elasticsearchClient: ElasticsearchClient, - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ): Promise { const settings = await getUiSettings(job.browserTimezone, uiSettingsClient, config, logger); const escapeValue = createEscapeValue(settings.quoteValues, settings.escapeFormulaValues); const bom = config.get('csv', 'useByteOrderMarkEncoding') ? CSV_BOM_CHARS : ''; - const builder = new MaxSizeStringBuilder(byteSizeValueToNumber(settings.maxSizeBytes), bom); + const builder = new MaxSizeStringBuilder( + stream, + byteSizeValueToNumber(settings.maxSizeBytes), + bom + ); const { fields, metaFields, conflictedTypesFields } = job; const header = `${fields.map(escapeValue).join(settings.separator)}\n`; @@ -71,7 +77,6 @@ export function createGenerateCsv(logger: LevelLogger) { if (!builder.tryAppend(header)) { return { size: 0, - content: '', maxSizeReached: true, warnings: [], }; @@ -148,7 +153,6 @@ export function createGenerateCsv(logger: LevelLogger) { } return { - content: builder.getString(), csvContainsFormulas: csvContainsFormulas && !settings.escapeFormulaValues, maxSizeReached, size, diff --git a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts index 604d451d822b6..8ba87445efd9d 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts @@ -78,7 +78,6 @@ type FormatsMapDeprecatedCSV = Map< >; export interface SavedSearchGeneratorResultDeprecatedCSV { - content: string; size: number; maxSizeReached: boolean; csvContainsFormulas?: boolean; diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts index b96828bb06334..497a01d2587ad 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts @@ -9,12 +9,14 @@ jest.mock('./generate_csv/generate_csv', () => ({ CsvGenerator: class CsvGeneratorMock { generateData() { return { - content: 'test\n123', + size: 123, + content_type: 'text/csv', }; } }, })); +import { Writable } from 'stream'; import nodeCrypto from '@elastic/node-crypto'; import { ReportingCore } from '../../'; import { CancellationToken } from '../../../common'; @@ -30,6 +32,7 @@ const encryptionKey = 'tetkey'; const headers = { sid: 'cooltestheaders' }; let encryptedHeaders: string; let reportingCore: ReportingCore; +let stream: jest.Mocked; beforeAll(async () => { const crypto = nodeCrypto({ encryptionKey }); @@ -48,6 +51,10 @@ beforeAll(async () => { ); }); +beforeEach(() => { + stream = {} as typeof stream; +}); + test('gets the csv content from job parameters', async () => { const runTask = runTaskFnFactory(reportingCore, logger); @@ -61,13 +68,14 @@ test('gets the csv content from job parameters', async () => { title: 'Test Search', version: '7.13.0', }, - new CancellationToken() + new CancellationToken(), + stream ); expect(payload).toMatchInlineSnapshot(` Object { - "content": "test - 123", + "content_type": "text/csv", + "size": 123, } `); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts index ff50377ab13c5..97f0aa65e3d68 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts @@ -18,7 +18,7 @@ export const runTaskFnFactory: RunTaskFnFactory> = ( ) => { const config = reporting.getConfig(); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const logger = parentLogger.clone([CSV_JOB_TYPE, 'execute-job', jobId]); const encryptionKey = config.get('encryptionKey'); @@ -43,7 +43,15 @@ export const runTaskFnFactory: RunTaskFnFactory> = ( fieldFormatsRegistry, }; - const csv = new CsvGenerator(job, config, clients, dependencies, cancellationToken, logger); + const csv = new CsvGenerator( + job, + config, + clients, + dependencies, + cancellationToken, + logger, + stream + ); return await csv.generateData(); }; }; diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index 596cb5a314414..b0a3a8c7001b1 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { identity, range } from 'lodash'; import { IScopedClusterClient, IUiSettingsClient, SearchResponse } from 'src/core/server'; @@ -40,6 +41,8 @@ let mockEsClient: IScopedClusterClient; let mockDataClient: IScopedSearchClient; let mockConfig: ReportingConfig; let uiSettingsClient: IUiSettingsClient; +let stream: jest.Mocked; +let content: string; const searchSourceMock = { ...searchSourceInstanceMock }; const mockSearchSourceService: jest.Mocked = { @@ -83,6 +86,8 @@ const mockFieldFormatsRegistry = ({ } as unknown) as FieldFormatsRegistry; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; mockEsClient = elasticsearchServiceMock.createScopedClusterClient(); mockDataClient = dataPluginMock.createStartContract().search.asScoped({} as any); mockDataClient.search = mockDataClientSearchDefault; @@ -131,10 +136,11 @@ it('formats an empty search result to CSV content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -170,10 +176,11 @@ it('formats a search result to CSV content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -214,7 +221,8 @@ it('calculates the bytes of the content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.size).toBe(2608); @@ -266,12 +274,13 @@ it('warns if max size was reached', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.max_size_reached).toBe(true); expect(csvResult.warnings).toEqual([]); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('uses the scrollId to page all the data', async () => { @@ -319,11 +328,12 @@ it('uses the scrollId to page all the data', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.warnings).toEqual([]); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(mockDataClient.search).toHaveBeenCalledTimes(1); expect(mockDataClient.search).toBeCalledWith( @@ -384,11 +394,12 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('provides top-level underscored fields as columns', async () => { @@ -440,12 +451,13 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -504,12 +516,13 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); }); @@ -550,11 +563,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('columns can be top-level fields such as _id and _index', async () => { @@ -592,11 +606,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('empty columns defaults to using searchSource.getFields()', async () => { @@ -640,11 +655,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); }); @@ -684,12 +700,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -733,12 +750,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -785,12 +803,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(true); }); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 7eaf1ef95c149..3855eff3821b9 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { i18n } from '@kbn/i18n'; import type { estypes } from '@elastic/elasticsearch'; import { IScopedClusterClient, IUiSettingsClient } from 'src/core/server'; @@ -73,7 +74,8 @@ export class CsvGenerator { private clients: Clients, private dependencies: Dependencies, private cancellationToken: CancellationToken, - private logger: LevelLogger + private logger: LevelLogger, + private stream: Writable ) {} private async scan( @@ -290,7 +292,7 @@ export class CsvGenerator { const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; - const builder = new MaxSizeStringBuilder(byteSizeValueToNumber(maxSizeBytes), bom); + const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; @@ -403,7 +405,6 @@ export class CsvGenerator { ); return { - content: builder.getString(), content_type: CONTENT_TYPE_CSV, csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues, max_size_reached: this.maxSizeReached, diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts index b0340a53fd603..27d3719f71f93 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts @@ -5,85 +5,86 @@ * 2.0. */ -import expect from '@kbn/expect'; +import { Writable } from 'stream'; import { MaxSizeStringBuilder } from './max_size_string_builder'; +let content: string; +let stream: jest.Mocked; + describe('MaxSizeStringBuilder', function () { + beforeEach(() => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + }); + describe('tryAppend', function () { it(`should return true if appended string is under maxSize`, function () { - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); const result = builder.tryAppend('aa'); - expect(result).to.be(true); + expect(result).toBe(true); }); it(`should return false if appended string is over the maxSize`, function () { - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); const result = builder.tryAppend('aa'); - expect(result).to.be(false); + expect(result).toBe(false); }); it(`should return true then false if second appended string puts total size over the maxSize`, function () { - const builder = new MaxSizeStringBuilder(1); - expect(builder.tryAppend('a')).to.be(true); - expect(builder.tryAppend('a')).to.be(false); - }); - }); - - describe('getBuffer', function () { - it(`should return an empty string when we don't call tryAppend`, function () { - const builder = new MaxSizeStringBuilder(100); - expect(builder.getString()).to.be(''); + const builder = new MaxSizeStringBuilder(stream, 1); + expect(builder.tryAppend('a')).toBe(true); + expect(builder.tryAppend('a')).toBe(false); }); - it('should return equivalent string if tryAppend called once and less than maxSize', function () { + it('should write equivalent string if called once and less than maxSize', function () { const str = 'foo'; - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); builder.tryAppend(str); - expect(builder.getString()).to.be(str); + expect(content).toBe(str); }); - it('should return equivalent string if tryAppend called multiple times and total size less than maxSize', function () { + it('should write equivalent string if called multiple times and total size less than maxSize', function () { const strs = ['foo', 'bar', 'baz']; - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); strs.forEach((str) => builder.tryAppend(str)); - expect(builder.getString()).to.be(strs.join('')); + expect(content).toBe(strs.join('')); }); - it('should return empty string if tryAppend called one time with size greater than maxSize', function () { + it('should write empty string if called one time with size greater than maxSize', function () { const str = 'aa'; // each a is one byte - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); builder.tryAppend(str); - expect(builder.getString()).to.be(''); + expect(content).toBe(''); }); - it('should return partial string if tryAppend called multiple times with total size greater than maxSize', function () { + it('should write partial string if called multiple times with total size greater than maxSize', function () { const str = 'a'; // each a is one byte - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); builder.tryAppend(str); builder.tryAppend(str); - expect(builder.getString()).to.be('a'); + expect(content).toBe('a'); }); - it('should return string with bom character prepended', function () { + it('should write string with bom character prepended', function () { const str = 'a'; // each a is one byte - const builder = new MaxSizeStringBuilder(1, '∆'); + const builder = new MaxSizeStringBuilder(stream, 1, '∆'); builder.tryAppend(str); builder.tryAppend(str); - expect(builder.getString()).to.be('∆a'); + expect(content).toBe('∆a'); }); }); describe('getSizeInBytes', function () { it(`should return 0 when no strings have been appended`, function () { - const builder = new MaxSizeStringBuilder(100); - expect(builder.getSizeInBytes()).to.be(0); + const builder = new MaxSizeStringBuilder(stream, 100); + expect(builder.getSizeInBytes()).toBe(0); }); - it(`should the size in bytes`, function () { - const builder = new MaxSizeStringBuilder(100); + it(`should return the size in bytes`, function () { + const builder = new MaxSizeStringBuilder(stream, 100); const stringValue = 'foobar'; builder.tryAppend(stringValue); - expect(builder.getSizeInBytes()).to.be(stringValue.length); + expect(builder.getSizeInBytes()).toBe(stringValue.length); }); }); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts index d5954083c5a0f..945464ac6c01c 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts @@ -5,35 +5,32 @@ * 2.0. */ +import { Writable } from 'stream'; + export class MaxSizeStringBuilder { - private _buffer: Buffer; - private _size: number; - private _maxSize: number; - private _bom: string; - - constructor(maxSizeBytes: number, bom = '') { - this._buffer = Buffer.alloc(maxSizeBytes); - this._size = 0; - this._maxSize = maxSizeBytes; - this._bom = bom; - } + private size = 0; + private pristine = true; + + constructor(private stream: Writable, private maxSizeBytes: number, private bom = '') {} - tryAppend(str: string) { - const byteLength = Buffer.byteLength(str); - if (this._size + byteLength <= this._maxSize) { - this._buffer.write(str, this._size); - this._size += byteLength; - return true; + tryAppend(chunk: string): boolean { + const byteLength = Buffer.byteLength(chunk); + if (this.size + byteLength > this.maxSizeBytes) { + return false; } - return false; - } + if (this.pristine) { + this.stream.write(this.bom); + this.pristine = false; + } + + this.stream.write(chunk); + this.size += byteLength; - getSizeInBytes() { - return this._size; + return true; } - getString() { - return this._bom + this._buffer.slice(0, this._size).toString(); + getSizeInBytes(): number { + return this.size; } } diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts index e59c38e16ab47..c261fa62d97d4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { KibanaRequest } from 'src/core/server'; import { CancellationToken } from '../../../common'; import { CSV_SEARCHSOURCE_IMMEDIATE_TYPE } from '../../../common/constants'; @@ -22,6 +23,7 @@ export type ImmediateExecuteFn = ( jobId: null, job: JobParamsDownloadCSV, context: ReportingRequestHandlerContext, + stream: Writable, req: KibanaRequest ) => Promise; @@ -32,7 +34,7 @@ export const runTaskFnFactory: RunTaskFnFactory = function e const config = reporting.getConfig(); const logger = parentLogger.clone([CSV_SEARCHSOURCE_IMMEDIATE_TYPE, 'execute-job']); - return async function runTask(_jobId, immediateJobParams, context, req) { + return async function runTask(_jobId, immediateJobParams, context, stream, req) { const job = { objectType: 'immediate-search', ...immediateJobParams, @@ -58,7 +60,15 @@ export const runTaskFnFactory: RunTaskFnFactory = function e }; const cancellationToken = new CancellationToken(); - const csv = new CsvGenerator(job, config, clients, dependencies, cancellationToken, logger); + const csv = new CsvGenerator( + job, + config, + clients, + dependencies, + cancellationToken, + logger, + stream + ); const result = await csv.generateData(); if (result.csv_contains_formulas) { diff --git a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts index ee264f7c57ff6..34cfa66ddd5e1 100644 --- a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts +++ b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { ReportingCore } from '../../../'; import { CancellationToken } from '../../../../common'; @@ -20,7 +21,9 @@ import { runTaskFnFactory } from './'; jest.mock('../lib/generate_png', () => ({ generatePngObservableFactory: jest.fn() })); +let content: string; let mockReporting: ReportingCore; +let stream: jest.Mocked; const cancellationToken = ({ on: jest.fn(), @@ -44,6 +47,9 @@ const encryptHeaders = async (headers: Record) => { const getBasePayload = (baseObj: any) => baseObj as TaskPayloadPNG; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + const mockReportingConfig = createMockConfigSchema({ index: '.reporting-2018.10.10', encryptionKey: mockEncryptionKey, @@ -75,7 +81,8 @@ test(`passes browserTimezone to generatePng`, async () => { browserTimezone, headers: encryptedHeaders, }), - cancellationToken + cancellationToken, + stream ); expect(generatePngObservable.mock.calls).toMatchInlineSnapshot(` @@ -119,7 +126,8 @@ test(`returns content_type of application/png`, async () => { const { content_type: contentType } = await runTask( 'pngJobId', getBasePayload({ relativeUrl: '/app/kibana#/something', headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(contentType).toBe('image/png'); }); @@ -131,10 +139,11 @@ test(`returns content of generatePng getBuffer base64 encoded`, async () => { const runTask = await runTaskFnFactory(mockReporting, getMockLogger()); const encryptedHeaders = await encryptHeaders({}); - const { content } = await runTask( + await runTask( 'pngJobId', getBasePayload({ relativeUrl: '/app/kibana#/something', headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(content).toEqual(testContent); diff --git a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts index 1dffd0ed4dd5a..1027e895b2cd0 100644 --- a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts @@ -7,7 +7,7 @@ import apm from 'elastic-apm-node'; import * as Rx from 'rxjs'; -import { catchError, finalize, map, mergeMap, takeUntil } from 'rxjs/operators'; +import { catchError, finalize, map, mergeMap, takeUntil, tap } from 'rxjs/operators'; import { PNG_JOB_TYPE } from '../../../../common/constants'; import { TaskRunResult } from '../../../lib/tasks'; import { RunTaskFn, RunTaskFnFactory } from '../../../types'; @@ -26,7 +26,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const config = reporting.getConfig(); const encryptionKey = config.get('encryptionKey'); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const apmTrans = apm.startTransaction('reporting execute_job png', 'reporting'); const apmGetAssets = apmTrans?.startSpan('get_assets', 'setup'); let apmGeneratePng: { end: () => void } | null | undefined; @@ -51,9 +51,9 @@ export const runTaskFnFactory: RunTaskFnFactory< job.layout ); }), + tap(({ base64 }) => stream.write(base64)), map(({ base64, warnings }) => ({ content_type: 'image/png', - content: base64, size: (base64 && base64.length) || 0, warnings, })), diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts index a9863a7edf607..d58ec4cde0f3d 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts @@ -7,6 +7,7 @@ jest.mock('../lib/generate_pdf', () => ({ generatePdfObservableFactory: jest.fn() })); +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { ReportingCore } from '../../../'; import { CancellationToken } from '../../../../common'; @@ -16,7 +17,9 @@ import { generatePdfObservableFactory } from '../lib/generate_pdf'; import { TaskPayloadPDF } from '../types'; import { runTaskFnFactory } from './'; +let content: string; let mockReporting: ReportingCore; +let stream: jest.Mocked; const cancellationToken = ({ on: jest.fn(), @@ -40,6 +43,9 @@ const encryptHeaders = async (headers: Record) => { const getBasePayload = (baseObj: any) => baseObj as TaskPayloadPDF; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + const reportingConfig = { 'server.basePath': '/sbp', index: '.reports-test', @@ -71,7 +77,8 @@ test(`passes browserTimezone to generatePdf`, async () => { browserTimezone, headers: encryptedHeaders, }), - cancellationToken + cancellationToken, + stream ); const tzParam = generatePdfObservable.mock.calls[0][3]; @@ -89,7 +96,8 @@ test(`returns content_type of application/pdf`, async () => { const { content_type: contentType } = await runTask( 'pdfJobId', getBasePayload({ relativeUrls: [], headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(contentType).toBe('application/pdf'); }); @@ -101,10 +109,11 @@ test(`returns content of generatePdf getBuffer base64 encoded`, async () => { const runTask = runTaskFnFactory(mockReporting, getMockLogger()); const encryptedHeaders = await encryptHeaders({}); - const { content } = await runTask( + await runTask( 'pdfJobId', getBasePayload({ relativeUrls: [], headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(content).toEqual(Buffer.from(testContent).toString('base64')); diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts index 8e215f87b52e0..a878c51ba02e2 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts @@ -27,7 +27,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const config = reporting.getConfig(); const encryptionKey = config.get('encryptionKey'); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const jobLogger = parentLogger.clone([PDF_JOB_TYPE, 'execute-job', jobId]); const apmTrans = apm.startTransaction('reporting execute_job pdf', 'reporting'); const apmGetAssets = apmTrans?.startSpan('get_assets', 'setup'); @@ -46,7 +46,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const urls = getFullUrls(config, job); const { browserTimezone, layout, title } = job; - if (apmGetAssets) apmGetAssets.end(); + apmGetAssets?.end(); apmGeneratePdf = apmTrans?.startSpan('generate_pdf_pipeline', 'execute'); return generatePdfObservable( @@ -60,15 +60,15 @@ export const runTaskFnFactory: RunTaskFnFactory< ); }), map(({ buffer, warnings }) => { - if (apmGeneratePdf) apmGeneratePdf.end(); - + apmGeneratePdf?.end(); const apmEncode = apmTrans?.startSpan('encode_pdf', 'output'); const content = buffer?.toString('base64') || null; - if (apmEncode) apmEncode.end(); + apmEncode?.end(); + + stream.write(content); return { content_type: 'application/pdf', - content, size: buffer?.byteLength || 0, warnings, }; @@ -81,7 +81,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const stop$ = Rx.fromEventPattern(cancellationToken.on); - if (apmTrans) apmTrans.end(); + apmTrans?.end(); return process$.pipe(takeUntil(stop$)).toPromise(); }; }; diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index c8c5f82b59039..da6460aa6a2a7 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -9,7 +9,7 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types' import { ElasticsearchClient } from 'src/core/server'; import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; -import { JobStatus, TaskRunResult } from '../../../common/types'; +import { JobStatus, ReportOutput } from '../../../common/types'; import { ILM_POLICY_NAME } from '../../../common/constants'; @@ -36,12 +36,12 @@ export type ReportProcessingFields = Required<{ export type ReportFailedFields = Required<{ completed_at: Report['completed_at']; - output: Report['output']; + output: ReportOutput | null; }>; export type ReportCompletedFields = Required<{ completed_at: Report['completed_at']; - output: Omit | null; + output: Omit | null; }>; /* diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index b0c8978a02f6a..1bbaa406088af 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { Readable, pipeline } from 'stream'; +import { Writable, finished } from 'stream'; import { promisify } from 'util'; import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; @@ -19,6 +19,7 @@ import { TaskRunCreatorFunction, } from '../../../../task_manager/server'; import { CancellationToken } from '../../../common'; +import { ReportOutput } from '../../../common/types'; import { durationToNumber, numberToDuration } from '../../../common/schema_utils'; import { ReportingConfigType } from '../../config'; import { BasePayload, RunTaskFn } from '../../types'; @@ -40,8 +41,8 @@ interface ReportingExecuteTaskInstance { runAt?: Date; } -function isOutput(output: TaskRunResult | Error): output is TaskRunResult { - return typeof output === 'object' && (output as TaskRunResult).content != null; +function isOutput(output: any): output is TaskRunResult { + return output?.size != null; } function reportFromTask(task: ReportTaskParams) { @@ -203,12 +204,11 @@ export class ExecuteReportTask implements ReportingTask { return await store.setReportFailed(report, doc); } - private _formatOutput(output: TaskRunResult | Error): TaskRunResult { - const docOutput = {} as TaskRunResult; + private _formatOutput(output: TaskRunResult | Error): ReportOutput { + const docOutput = {} as ReportOutput; const unknownMime = null; if (isOutput(output)) { - docOutput.content = output.content; docOutput.content_type = output.content_type || unknownMime; docOutput.max_size_reached = output.max_size_reached; docOutput.csv_contains_formulas = output.csv_contains_formulas; @@ -227,7 +227,8 @@ export class ExecuteReportTask implements ReportingTask { public async _performJob( task: ReportTaskParams, - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ): Promise { if (!this.taskExecutors) { throw new Error(`Task run function factories have not been called yet!`); @@ -242,7 +243,7 @@ export class ExecuteReportTask implements ReportingTask { // run the report // if workerFn doesn't finish before timeout, call the cancellationToken and throw an error const queueTimeout = durationToNumber(this.config.queue.timeout); - return Rx.from(runner(task.id, task.payload, cancellationToken)) + return Rx.from(runner(task.id, task.payload, cancellationToken, stream)) .pipe(timeout(queueTimeout)) // throw an error if a value is not emitted before timeout .toPromise(); } @@ -253,18 +254,7 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); const completedTime = moment().toISOString(); - const { content, ...docOutput } = this._formatOutput(output); - const stream = await getContentStream(this.reporting, { - id: report._id, - index: report._index!, - if_primary_term: report._primary_term, - if_seq_no: report._seq_no, - }); - - await promisify(pipeline)(Readable.from(content ?? ''), stream); - report._seq_no = stream.getSeqNo(); - report._primary_term = stream.getPrimaryTerm(); - + const docOutput = this._formatOutput(output); const store = await this.getStore(); const doc = { completed_at: completedTime, @@ -332,7 +322,20 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`); try { - const output = await this._performJob(task, cancellationToken); + const stream = await getContentStream(this.reporting, { + id: report._id, + index: report._index!, + if_primary_term: report._primary_term, + if_seq_no: report._seq_no, + }); + const output = await this._performJob(task, cancellationToken, stream); + + stream.end(); + await promisify(finished)(stream, { readable: false }); + + report._seq_no = stream.getSeqNo(); + report._primary_term = stream.getPrimaryTerm(); + if (output) { report = await this._completeJob(report, output); } diff --git a/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts b/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts index 8d31c03c618c9..3d482d4f84d52 100644 --- a/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts +++ b/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { schema } from '@kbn/config-schema'; import { KibanaRequest } from 'src/core/server'; import { ReportingCore } from '../'; @@ -67,11 +68,23 @@ export function registerGenerateCsvFromSavedObjectImmediate( const runTaskFn = runTaskFnFactory(reporting, logger); try { + let buffer = Buffer.from(''); + const stream = new Writable({ + write(chunk, encoding, callback) { + buffer = Buffer.concat([ + buffer, + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding), + ]); + callback(); + }, + }); + const { content_type: jobOutputContentType, - content: jobOutputContent, size: jobOutputSize, - }: TaskRunResult = await runTaskFn(null, req.body, context, req); + }: TaskRunResult = await runTaskFn(null, req.body, context, stream, req); + stream.end(); + const jobOutputContent = buffer.toString(); logger.info(`Job output size: ${jobOutputSize} bytes`); diff --git a/x-pack/plugins/reporting/server/types.ts b/x-pack/plugins/reporting/server/types.ts index da228b09f79d2..16cd247b4d00e 100644 --- a/x-pack/plugins/reporting/server/types.ts +++ b/x-pack/plugins/reporting/server/types.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import type { IRouter, KibanaRequest, RequestHandlerContext } from 'src/core/server'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { DataPluginStart } from 'src/plugins/data/server/plugin'; @@ -76,7 +77,8 @@ export type CreateJobFn = ( jobId: string, payload: ReportTaskParams['payload'], - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ) => Promise; export type CreateJobFnFactory = (