From 7e3a4491eeaf8567b55947b9da85201db6c34d39 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Tue, 3 Aug 2021 23:10:27 +0200 Subject: [PATCH] Update jobs executors to use content stream instead of returning report contents --- x-pack/plugins/reporting/common/types.ts | 8 +- .../export_types/csv/execute_job.test.ts | 134 +++++++++++------- .../server/export_types/csv/execute_job.ts | 8 +- .../export_types/csv/generate_csv/index.ts | 12 +- .../server/export_types/csv/types.d.ts | 1 - .../csv_searchsource/execute_job.test.ts | 16 ++- .../csv_searchsource/execute_job.ts | 12 +- .../generate_csv/generate_csv.test.ts | 81 +++++++---- .../generate_csv/generate_csv.ts | 7 +- .../max_size_string_builder.test.ts | 71 +++++----- .../generate_csv/max_size_string_builder.ts | 43 +++--- .../csv_searchsource_immediate/execute_job.ts | 14 +- .../png/execute_job/index.test.ts | 17 ++- .../export_types/png/execute_job/index.ts | 6 +- .../printable_pdf/execute_job/index.test.ts | 17 ++- .../printable_pdf/execute_job/index.ts | 14 +- .../reporting/server/lib/store/store.ts | 6 +- .../server/lib/tasks/execute_report.ts | 45 +++--- .../routes/csv_searchsource_immediate.ts | 17 ++- x-pack/plugins/reporting/server/types.ts | 4 +- 20 files changed, 322 insertions(+), 211 deletions(-) diff --git a/x-pack/plugins/reporting/common/types.ts b/x-pack/plugins/reporting/common/types.ts index ba200ccd1ffab..7bcf69b564b3c 100644 --- a/x-pack/plugins/reporting/common/types.ts +++ b/x-pack/plugins/reporting/common/types.ts @@ -44,7 +44,7 @@ export interface ReportDocumentHead { _primary_term: number; } -export interface TaskRunResult { +export interface ReportOutput { content_type: string | null; content: string | null; size: number; @@ -53,6 +53,8 @@ export interface TaskRunResult { warnings?: string[]; } +export type TaskRunResult = Omit; + export interface ReportSource { /* * Required fields: populated in enqueue_job when the request comes in to @@ -73,7 +75,7 @@ export interface ReportSource { /* * `output` is only populated if the report job is completed or failed. */ - output: TaskRunResult | null; + output: ReportOutput | null; /* * Optional fields: populated when the job is claimed to execute, and after @@ -127,7 +129,7 @@ export type JobStatus = */ interface ReportSimple extends Omit { payload: Omit; - output?: Omit; // is undefined for report jobs that are not completed + output?: Omit; // is undefined for report jobs that are not completed } /* diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts index 0e8a7016b853b..65b53b3b77eb4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import type { DeeplyMockedKeys } from '@kbn/utility-types/jest'; import nodeCrypto from '@elastic/node-crypto'; import { ElasticsearchClient, IUiSettingsClient } from 'kibana/server'; @@ -56,6 +57,8 @@ describe('CSV Execute Job', function () { let mockReportingConfig: ReportingConfig; let mockReportingCore: ReportingCore; let cancellationToken: any; + let stream: jest.Mocked; + let content: string; const mockUiSettingsClient = { get: sinon.stub(), @@ -67,6 +70,8 @@ describe('CSV Execute Job', function () { }); beforeEach(async function () { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; configGetStub = sinon.stub(); configGetStub.withArgs('queue', 'timeout').returns(moment.duration('2m')); configGetStub.withArgs('index').returns('.reporting-foo-test'); @@ -124,7 +129,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); }); @@ -145,7 +151,7 @@ describe('CSV Execute Job', function () { }, }); - await runTask('job777', job, cancellationToken); + await runTask('job777', job, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith(expect.objectContaining({ body, index })); }); @@ -171,7 +177,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.scroll).toHaveBeenCalledWith( @@ -188,7 +195,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); @@ -221,7 +229,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.search).toHaveBeenCalled(); @@ -257,7 +266,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); expect(mockEsClient.clearScroll).toHaveBeenCalledWith( @@ -290,9 +300,9 @@ describe('CSV Execute Job', function () { conflictedTypesFields: undefined, searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[TypeError: Cannot read property 'indexOf' of undefined]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[TypeError: Cannot read property 'indexOf' of undefined]`); expect(mockEsClient.clearScroll).toHaveBeenCalledWith( expect.objectContaining({ body: { scroll_id: lastScrollId } }) @@ -322,7 +332,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(true); @@ -349,7 +360,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(true); @@ -377,7 +389,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -406,7 +419,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -433,7 +447,8 @@ describe('CSV Execute Job', function () { const { csv_contains_formulas: csvContainsFormulas } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream ); expect(csvContainsFormulas).toEqual(false); @@ -459,7 +474,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual(`${CSV_BOM_CHARS}one,two\none,bar\n`); }); @@ -482,7 +497,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual('one,two\none,bar\n'); }); @@ -507,7 +522,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual("one,two\n\"'=cmd|' /C calc'!A0\",bar\n"); }); @@ -530,7 +545,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toEqual('one,two\n"=cmd|\' /C calc\'!A0",bar\n'); }); @@ -545,9 +560,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[Error]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[Error]`); }); it('should reject Promise if scroll call errors out', async function () { @@ -566,9 +581,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( - `[Error]` - ); + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot(`[Error]`); }); }); @@ -589,7 +604,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[{}]}}]` ); }); @@ -610,7 +627,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[]}}]` ); }); @@ -640,7 +659,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[{}]}}]` ); }); @@ -670,7 +691,9 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }); - await expect(runTask('job123', jobParams, cancellationToken)).rejects.toMatchInlineSnapshot( + await expect( + runTask('job123', jobParams, cancellationToken, stream) + ).rejects.toMatchInlineSnapshot( `[Error: Expected _scroll_id in the following Elasticsearch response: {"hits":{"hits":[]}}]` ); }); @@ -705,7 +728,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); await delay(250); @@ -729,7 +753,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); cancellationToken.cancel(); @@ -745,7 +770,8 @@ describe('CSV Execute Job', function () { fields: [], searchRequest: { index: null, body: null }, }), - cancellationToken + cancellationToken, + stream ); await delay(100); cancellationToken.cancel(); @@ -767,7 +793,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one,two\n`); }); @@ -779,7 +805,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one;two\n`); }); @@ -791,7 +817,7 @@ describe('CSV Execute Job', function () { fields: ['one and a half', 'two', 'three-and-four', 'five & six'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`"one and a half",two,"three-and-four","five & six"\n`); }); @@ -803,7 +829,7 @@ describe('CSV Execute Job', function () { fields: ['one and a half', 'two', 'three-and-four', 'five & six'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).toBe(`one and a half,two,three-and-four,five & six\n`); }); @@ -823,7 +849,7 @@ describe('CSV Execute Job', function () { fields: ['one', 'two'], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); const headerLine = lines[0]; @@ -847,7 +873,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); const valuesLine = lines[1]; @@ -879,7 +905,7 @@ describe('CSV Execute Job', function () { conflictedTypesFields: [], searchRequest: { index: null, body: null }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); @@ -913,7 +939,7 @@ describe('CSV Execute Job', function () { }, }, }); - const { content } = await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(content).not.toBe(null); const lines = content!.split('\n'); @@ -927,7 +953,6 @@ describe('CSV Execute Job', function () { // tests use these 'simple' characters to make the math easier describe('when only the headers exceed the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -940,10 +965,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -957,7 +983,6 @@ describe('CSV Execute Job', function () { }); describe('when headers are equal to maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -970,10 +995,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -987,7 +1013,6 @@ describe('CSV Execute Job', function () { }); describe('when the data exceeds the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -1010,10 +1035,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -1027,7 +1053,6 @@ describe('CSV Execute Job', function () { }); describe('when headers and data equal the maxSizeBytes', function () { - let content: string | null; let maxSizeReached: boolean | undefined; beforeEach(async function () { @@ -1052,10 +1077,11 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - ({ content, max_size_reached: maxSizeReached } = await runTask( + ({ max_size_reached: maxSizeReached } = await runTask( 'job123', jobParams, - cancellationToken + cancellationToken, + stream )); }); @@ -1091,7 +1117,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith( expect.objectContaining({ scroll: scrollDuration }) @@ -1119,7 +1145,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.search).toHaveBeenCalledWith( expect.objectContaining({ size: scrollSize }) @@ -1147,7 +1173,7 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); - await runTask('job123', jobParams, cancellationToken); + await runTask('job123', jobParams, cancellationToken, stream); expect(mockEsClient.scroll).toHaveBeenCalledWith( expect.objectContaining({ body: { scroll: scrollDuration, scroll_id: 'scrollId' } }) diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts index 57559d136ff3e..322cde38d7fd6 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts @@ -16,7 +16,7 @@ export const runTaskFnFactory: RunTaskFnFactory< > = function executeJobFactoryFn(reporting, parentLogger) { const config = reporting.getConfig(); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const elasticsearch = await reporting.getEsClient(); const logger = parentLogger.clone([jobId]); const generateCsv = createGenerateCsv(logger); @@ -27,18 +27,18 @@ export const runTaskFnFactory: RunTaskFnFactory< const uiSettingsClient = await reporting.getUiSettingsClient(fakeRequest, logger); const { asCurrentUser: elasticsearchClient } = elasticsearch.asScoped(fakeRequest); - const { content, maxSizeReached, size, csvContainsFormulas, warnings } = await generateCsv( + const { maxSizeReached, size, csvContainsFormulas, warnings } = await generateCsv( job, config, uiSettingsClient, elasticsearchClient, - cancellationToken + cancellationToken, + stream ); // @TODO: Consolidate these one-off warnings into the warnings array (max-size reached and csv contains formulas) return { content_type: CONTENT_TYPE_CSV, - content, max_size_reached: maxSizeReached, size, csv_contains_formulas: csvContainsFormulas, diff --git a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts index c1018030827c0..9dfc32b90e8c5 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { i18n } from '@kbn/i18n'; import { ElasticsearchClient, IUiSettingsClient } from 'src/core/server'; import { ReportingConfig } from '../../../'; @@ -57,12 +58,17 @@ export function createGenerateCsv(logger: LevelLogger) { config: ReportingConfig, uiSettingsClient: IUiSettingsClient, elasticsearchClient: ElasticsearchClient, - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ): Promise { const settings = await getUiSettings(job.browserTimezone, uiSettingsClient, config, logger); const escapeValue = createEscapeValue(settings.quoteValues, settings.escapeFormulaValues); const bom = config.get('csv', 'useByteOrderMarkEncoding') ? CSV_BOM_CHARS : ''; - const builder = new MaxSizeStringBuilder(byteSizeValueToNumber(settings.maxSizeBytes), bom); + const builder = new MaxSizeStringBuilder( + stream, + byteSizeValueToNumber(settings.maxSizeBytes), + bom + ); const { fields, metaFields, conflictedTypesFields } = job; const header = `${fields.map(escapeValue).join(settings.separator)}\n`; @@ -71,7 +77,6 @@ export function createGenerateCsv(logger: LevelLogger) { if (!builder.tryAppend(header)) { return { size: 0, - content: '', maxSizeReached: true, warnings: [], }; @@ -148,7 +153,6 @@ export function createGenerateCsv(logger: LevelLogger) { } return { - content: builder.getString(), csvContainsFormulas: csvContainsFormulas && !settings.escapeFormulaValues, maxSizeReached, size, diff --git a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts index 604d451d822b6..8ba87445efd9d 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts @@ -78,7 +78,6 @@ type FormatsMapDeprecatedCSV = Map< >; export interface SavedSearchGeneratorResultDeprecatedCSV { - content: string; size: number; maxSizeReached: boolean; csvContainsFormulas?: boolean; diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts index b96828bb06334..497a01d2587ad 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.test.ts @@ -9,12 +9,14 @@ jest.mock('./generate_csv/generate_csv', () => ({ CsvGenerator: class CsvGeneratorMock { generateData() { return { - content: 'test\n123', + size: 123, + content_type: 'text/csv', }; } }, })); +import { Writable } from 'stream'; import nodeCrypto from '@elastic/node-crypto'; import { ReportingCore } from '../../'; import { CancellationToken } from '../../../common'; @@ -30,6 +32,7 @@ const encryptionKey = 'tetkey'; const headers = { sid: 'cooltestheaders' }; let encryptedHeaders: string; let reportingCore: ReportingCore; +let stream: jest.Mocked; beforeAll(async () => { const crypto = nodeCrypto({ encryptionKey }); @@ -48,6 +51,10 @@ beforeAll(async () => { ); }); +beforeEach(() => { + stream = {} as typeof stream; +}); + test('gets the csv content from job parameters', async () => { const runTask = runTaskFnFactory(reportingCore, logger); @@ -61,13 +68,14 @@ test('gets the csv content from job parameters', async () => { title: 'Test Search', version: '7.13.0', }, - new CancellationToken() + new CancellationToken(), + stream ); expect(payload).toMatchInlineSnapshot(` Object { - "content": "test - 123", + "content_type": "text/csv", + "size": 123, } `); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts index ff50377ab13c5..97f0aa65e3d68 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/execute_job.ts @@ -18,7 +18,7 @@ export const runTaskFnFactory: RunTaskFnFactory> = ( ) => { const config = reporting.getConfig(); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const logger = parentLogger.clone([CSV_JOB_TYPE, 'execute-job', jobId]); const encryptionKey = config.get('encryptionKey'); @@ -43,7 +43,15 @@ export const runTaskFnFactory: RunTaskFnFactory> = ( fieldFormatsRegistry, }; - const csv = new CsvGenerator(job, config, clients, dependencies, cancellationToken, logger); + const csv = new CsvGenerator( + job, + config, + clients, + dependencies, + cancellationToken, + logger, + stream + ); return await csv.generateData(); }; }; diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index 596cb5a314414..b0a3a8c7001b1 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { identity, range } from 'lodash'; import { IScopedClusterClient, IUiSettingsClient, SearchResponse } from 'src/core/server'; @@ -40,6 +41,8 @@ let mockEsClient: IScopedClusterClient; let mockDataClient: IScopedSearchClient; let mockConfig: ReportingConfig; let uiSettingsClient: IUiSettingsClient; +let stream: jest.Mocked; +let content: string; const searchSourceMock = { ...searchSourceInstanceMock }; const mockSearchSourceService: jest.Mocked = { @@ -83,6 +86,8 @@ const mockFieldFormatsRegistry = ({ } as unknown) as FieldFormatsRegistry; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; mockEsClient = elasticsearchServiceMock.createScopedClusterClient(); mockDataClient = dataPluginMock.createStartContract().search.asScoped({} as any); mockDataClient.search = mockDataClientSearchDefault; @@ -131,10 +136,11 @@ it('formats an empty search result to CSV content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -170,10 +176,11 @@ it('formats a search result to CSV content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -214,7 +221,8 @@ it('calculates the bytes of the content', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.size).toBe(2608); @@ -266,12 +274,13 @@ it('warns if max size was reached', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.max_size_reached).toBe(true); expect(csvResult.warnings).toEqual([]); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('uses the scrollId to page all the data', async () => { @@ -319,11 +328,12 @@ it('uses the scrollId to page all the data', async () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); expect(csvResult.warnings).toEqual([]); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(mockDataClient.search).toHaveBeenCalledTimes(1); expect(mockDataClient.search).toBeCalledWith( @@ -384,11 +394,12 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('provides top-level underscored fields as columns', async () => { @@ -440,12 +451,13 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -504,12 +516,13 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); }); @@ -550,11 +563,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('columns can be top-level fields such as _id and _index', async () => { @@ -592,11 +606,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); it('empty columns defaults to using searchSource.getFields()', async () => { @@ -640,11 +655,12 @@ describe('fields from job.columns (7.13+ generated)', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); - const csvResult = await generateCsv.generateData(); + await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); }); }); @@ -684,12 +700,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -733,12 +750,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(false); }); @@ -785,12 +803,13 @@ describe('formulas', () => { fieldFormatsRegistry: mockFieldFormatsRegistry, }, new CancellationToken(), - logger + logger, + stream ); const csvResult = await generateCsv.generateData(); - expect(csvResult.content).toMatchSnapshot(); + expect(content).toMatchSnapshot(); expect(csvResult.csv_contains_formulas).toBe(true); }); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 7eaf1ef95c149..3855eff3821b9 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { i18n } from '@kbn/i18n'; import type { estypes } from '@elastic/elasticsearch'; import { IScopedClusterClient, IUiSettingsClient } from 'src/core/server'; @@ -73,7 +74,8 @@ export class CsvGenerator { private clients: Clients, private dependencies: Dependencies, private cancellationToken: CancellationToken, - private logger: LevelLogger + private logger: LevelLogger, + private stream: Writable ) {} private async scan( @@ -290,7 +292,7 @@ export class CsvGenerator { const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; - const builder = new MaxSizeStringBuilder(byteSizeValueToNumber(maxSizeBytes), bom); + const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; @@ -403,7 +405,6 @@ export class CsvGenerator { ); return { - content: builder.getString(), content_type: CONTENT_TYPE_CSV, csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues, max_size_reached: this.maxSizeReached, diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts index b0340a53fd603..27d3719f71f93 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.test.ts @@ -5,85 +5,86 @@ * 2.0. */ -import expect from '@kbn/expect'; +import { Writable } from 'stream'; import { MaxSizeStringBuilder } from './max_size_string_builder'; +let content: string; +let stream: jest.Mocked; + describe('MaxSizeStringBuilder', function () { + beforeEach(() => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + }); + describe('tryAppend', function () { it(`should return true if appended string is under maxSize`, function () { - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); const result = builder.tryAppend('aa'); - expect(result).to.be(true); + expect(result).toBe(true); }); it(`should return false if appended string is over the maxSize`, function () { - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); const result = builder.tryAppend('aa'); - expect(result).to.be(false); + expect(result).toBe(false); }); it(`should return true then false if second appended string puts total size over the maxSize`, function () { - const builder = new MaxSizeStringBuilder(1); - expect(builder.tryAppend('a')).to.be(true); - expect(builder.tryAppend('a')).to.be(false); - }); - }); - - describe('getBuffer', function () { - it(`should return an empty string when we don't call tryAppend`, function () { - const builder = new MaxSizeStringBuilder(100); - expect(builder.getString()).to.be(''); + const builder = new MaxSizeStringBuilder(stream, 1); + expect(builder.tryAppend('a')).toBe(true); + expect(builder.tryAppend('a')).toBe(false); }); - it('should return equivalent string if tryAppend called once and less than maxSize', function () { + it('should write equivalent string if called once and less than maxSize', function () { const str = 'foo'; - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); builder.tryAppend(str); - expect(builder.getString()).to.be(str); + expect(content).toBe(str); }); - it('should return equivalent string if tryAppend called multiple times and total size less than maxSize', function () { + it('should write equivalent string if called multiple times and total size less than maxSize', function () { const strs = ['foo', 'bar', 'baz']; - const builder = new MaxSizeStringBuilder(100); + const builder = new MaxSizeStringBuilder(stream, 100); strs.forEach((str) => builder.tryAppend(str)); - expect(builder.getString()).to.be(strs.join('')); + expect(content).toBe(strs.join('')); }); - it('should return empty string if tryAppend called one time with size greater than maxSize', function () { + it('should write empty string if called one time with size greater than maxSize', function () { const str = 'aa'; // each a is one byte - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); builder.tryAppend(str); - expect(builder.getString()).to.be(''); + expect(content).toBe(''); }); - it('should return partial string if tryAppend called multiple times with total size greater than maxSize', function () { + it('should write partial string if called multiple times with total size greater than maxSize', function () { const str = 'a'; // each a is one byte - const builder = new MaxSizeStringBuilder(1); + const builder = new MaxSizeStringBuilder(stream, 1); builder.tryAppend(str); builder.tryAppend(str); - expect(builder.getString()).to.be('a'); + expect(content).toBe('a'); }); - it('should return string with bom character prepended', function () { + it('should write string with bom character prepended', function () { const str = 'a'; // each a is one byte - const builder = new MaxSizeStringBuilder(1, '∆'); + const builder = new MaxSizeStringBuilder(stream, 1, '∆'); builder.tryAppend(str); builder.tryAppend(str); - expect(builder.getString()).to.be('∆a'); + expect(content).toBe('∆a'); }); }); describe('getSizeInBytes', function () { it(`should return 0 when no strings have been appended`, function () { - const builder = new MaxSizeStringBuilder(100); - expect(builder.getSizeInBytes()).to.be(0); + const builder = new MaxSizeStringBuilder(stream, 100); + expect(builder.getSizeInBytes()).toBe(0); }); - it(`should the size in bytes`, function () { - const builder = new MaxSizeStringBuilder(100); + it(`should return the size in bytes`, function () { + const builder = new MaxSizeStringBuilder(stream, 100); const stringValue = 'foobar'; builder.tryAppend(stringValue); - expect(builder.getSizeInBytes()).to.be(stringValue.length); + expect(builder.getSizeInBytes()).toBe(stringValue.length); }); }); }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts index d5954083c5a0f..945464ac6c01c 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/max_size_string_builder.ts @@ -5,35 +5,32 @@ * 2.0. */ +import { Writable } from 'stream'; + export class MaxSizeStringBuilder { - private _buffer: Buffer; - private _size: number; - private _maxSize: number; - private _bom: string; - - constructor(maxSizeBytes: number, bom = '') { - this._buffer = Buffer.alloc(maxSizeBytes); - this._size = 0; - this._maxSize = maxSizeBytes; - this._bom = bom; - } + private size = 0; + private pristine = true; + + constructor(private stream: Writable, private maxSizeBytes: number, private bom = '') {} - tryAppend(str: string) { - const byteLength = Buffer.byteLength(str); - if (this._size + byteLength <= this._maxSize) { - this._buffer.write(str, this._size); - this._size += byteLength; - return true; + tryAppend(chunk: string): boolean { + const byteLength = Buffer.byteLength(chunk); + if (this.size + byteLength > this.maxSizeBytes) { + return false; } - return false; - } + if (this.pristine) { + this.stream.write(this.bom); + this.pristine = false; + } + + this.stream.write(chunk); + this.size += byteLength; - getSizeInBytes() { - return this._size; + return true; } - getString() { - return this._bom + this._buffer.slice(0, this._size).toString(); + getSizeInBytes(): number { + return this.size; } } diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts index e59c38e16ab47..c261fa62d97d4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource_immediate/execute_job.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { KibanaRequest } from 'src/core/server'; import { CancellationToken } from '../../../common'; import { CSV_SEARCHSOURCE_IMMEDIATE_TYPE } from '../../../common/constants'; @@ -22,6 +23,7 @@ export type ImmediateExecuteFn = ( jobId: null, job: JobParamsDownloadCSV, context: ReportingRequestHandlerContext, + stream: Writable, req: KibanaRequest ) => Promise; @@ -32,7 +34,7 @@ export const runTaskFnFactory: RunTaskFnFactory = function e const config = reporting.getConfig(); const logger = parentLogger.clone([CSV_SEARCHSOURCE_IMMEDIATE_TYPE, 'execute-job']); - return async function runTask(_jobId, immediateJobParams, context, req) { + return async function runTask(_jobId, immediateJobParams, context, stream, req) { const job = { objectType: 'immediate-search', ...immediateJobParams, @@ -58,7 +60,15 @@ export const runTaskFnFactory: RunTaskFnFactory = function e }; const cancellationToken = new CancellationToken(); - const csv = new CsvGenerator(job, config, clients, dependencies, cancellationToken, logger); + const csv = new CsvGenerator( + job, + config, + clients, + dependencies, + cancellationToken, + logger, + stream + ); const result = await csv.generateData(); if (result.csv_contains_formulas) { diff --git a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts index ee264f7c57ff6..34cfa66ddd5e1 100644 --- a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts +++ b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { ReportingCore } from '../../../'; import { CancellationToken } from '../../../../common'; @@ -20,7 +21,9 @@ import { runTaskFnFactory } from './'; jest.mock('../lib/generate_png', () => ({ generatePngObservableFactory: jest.fn() })); +let content: string; let mockReporting: ReportingCore; +let stream: jest.Mocked; const cancellationToken = ({ on: jest.fn(), @@ -44,6 +47,9 @@ const encryptHeaders = async (headers: Record) => { const getBasePayload = (baseObj: any) => baseObj as TaskPayloadPNG; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + const mockReportingConfig = createMockConfigSchema({ index: '.reporting-2018.10.10', encryptionKey: mockEncryptionKey, @@ -75,7 +81,8 @@ test(`passes browserTimezone to generatePng`, async () => { browserTimezone, headers: encryptedHeaders, }), - cancellationToken + cancellationToken, + stream ); expect(generatePngObservable.mock.calls).toMatchInlineSnapshot(` @@ -119,7 +126,8 @@ test(`returns content_type of application/png`, async () => { const { content_type: contentType } = await runTask( 'pngJobId', getBasePayload({ relativeUrl: '/app/kibana#/something', headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(contentType).toBe('image/png'); }); @@ -131,10 +139,11 @@ test(`returns content of generatePng getBuffer base64 encoded`, async () => { const runTask = await runTaskFnFactory(mockReporting, getMockLogger()); const encryptedHeaders = await encryptHeaders({}); - const { content } = await runTask( + await runTask( 'pngJobId', getBasePayload({ relativeUrl: '/app/kibana#/something', headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(content).toEqual(testContent); diff --git a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts index 1dffd0ed4dd5a..1027e895b2cd0 100644 --- a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts @@ -7,7 +7,7 @@ import apm from 'elastic-apm-node'; import * as Rx from 'rxjs'; -import { catchError, finalize, map, mergeMap, takeUntil } from 'rxjs/operators'; +import { catchError, finalize, map, mergeMap, takeUntil, tap } from 'rxjs/operators'; import { PNG_JOB_TYPE } from '../../../../common/constants'; import { TaskRunResult } from '../../../lib/tasks'; import { RunTaskFn, RunTaskFnFactory } from '../../../types'; @@ -26,7 +26,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const config = reporting.getConfig(); const encryptionKey = config.get('encryptionKey'); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const apmTrans = apm.startTransaction('reporting execute_job png', 'reporting'); const apmGetAssets = apmTrans?.startSpan('get_assets', 'setup'); let apmGeneratePng: { end: () => void } | null | undefined; @@ -51,9 +51,9 @@ export const runTaskFnFactory: RunTaskFnFactory< job.layout ); }), + tap(({ base64 }) => stream.write(base64)), map(({ base64, warnings }) => ({ content_type: 'image/png', - content: base64, size: (base64 && base64.length) || 0, warnings, })), diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts index a9863a7edf607..d58ec4cde0f3d 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.test.ts @@ -7,6 +7,7 @@ jest.mock('../lib/generate_pdf', () => ({ generatePdfObservableFactory: jest.fn() })); +import { Writable } from 'stream'; import * as Rx from 'rxjs'; import { ReportingCore } from '../../../'; import { CancellationToken } from '../../../../common'; @@ -16,7 +17,9 @@ import { generatePdfObservableFactory } from '../lib/generate_pdf'; import { TaskPayloadPDF } from '../types'; import { runTaskFnFactory } from './'; +let content: string; let mockReporting: ReportingCore; +let stream: jest.Mocked; const cancellationToken = ({ on: jest.fn(), @@ -40,6 +43,9 @@ const encryptHeaders = async (headers: Record) => { const getBasePayload = (baseObj: any) => baseObj as TaskPayloadPDF; beforeEach(async () => { + content = ''; + stream = ({ write: jest.fn((chunk) => (content += chunk)) } as unknown) as typeof stream; + const reportingConfig = { 'server.basePath': '/sbp', index: '.reports-test', @@ -71,7 +77,8 @@ test(`passes browserTimezone to generatePdf`, async () => { browserTimezone, headers: encryptedHeaders, }), - cancellationToken + cancellationToken, + stream ); const tzParam = generatePdfObservable.mock.calls[0][3]; @@ -89,7 +96,8 @@ test(`returns content_type of application/pdf`, async () => { const { content_type: contentType } = await runTask( 'pdfJobId', getBasePayload({ relativeUrls: [], headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(contentType).toBe('application/pdf'); }); @@ -101,10 +109,11 @@ test(`returns content of generatePdf getBuffer base64 encoded`, async () => { const runTask = runTaskFnFactory(mockReporting, getMockLogger()); const encryptedHeaders = await encryptHeaders({}); - const { content } = await runTask( + await runTask( 'pdfJobId', getBasePayload({ relativeUrls: [], headers: encryptedHeaders }), - cancellationToken + cancellationToken, + stream ); expect(content).toEqual(Buffer.from(testContent).toString('base64')); diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts index 8e215f87b52e0..a878c51ba02e2 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts @@ -27,7 +27,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const config = reporting.getConfig(); const encryptionKey = config.get('encryptionKey'); - return async function runTask(jobId, job, cancellationToken) { + return async function runTask(jobId, job, cancellationToken, stream) { const jobLogger = parentLogger.clone([PDF_JOB_TYPE, 'execute-job', jobId]); const apmTrans = apm.startTransaction('reporting execute_job pdf', 'reporting'); const apmGetAssets = apmTrans?.startSpan('get_assets', 'setup'); @@ -46,7 +46,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const urls = getFullUrls(config, job); const { browserTimezone, layout, title } = job; - if (apmGetAssets) apmGetAssets.end(); + apmGetAssets?.end(); apmGeneratePdf = apmTrans?.startSpan('generate_pdf_pipeline', 'execute'); return generatePdfObservable( @@ -60,15 +60,15 @@ export const runTaskFnFactory: RunTaskFnFactory< ); }), map(({ buffer, warnings }) => { - if (apmGeneratePdf) apmGeneratePdf.end(); - + apmGeneratePdf?.end(); const apmEncode = apmTrans?.startSpan('encode_pdf', 'output'); const content = buffer?.toString('base64') || null; - if (apmEncode) apmEncode.end(); + apmEncode?.end(); + + stream.write(content); return { content_type: 'application/pdf', - content, size: buffer?.byteLength || 0, warnings, }; @@ -81,7 +81,7 @@ export const runTaskFnFactory: RunTaskFnFactory< const stop$ = Rx.fromEventPattern(cancellationToken.on); - if (apmTrans) apmTrans.end(); + apmTrans?.end(); return process$.pipe(takeUntil(stop$)).toPromise(); }; }; diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index c8c5f82b59039..da6460aa6a2a7 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -9,7 +9,7 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types' import { ElasticsearchClient } from 'src/core/server'; import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; -import { JobStatus, TaskRunResult } from '../../../common/types'; +import { JobStatus, ReportOutput } from '../../../common/types'; import { ILM_POLICY_NAME } from '../../../common/constants'; @@ -36,12 +36,12 @@ export type ReportProcessingFields = Required<{ export type ReportFailedFields = Required<{ completed_at: Report['completed_at']; - output: Report['output']; + output: ReportOutput | null; }>; export type ReportCompletedFields = Required<{ completed_at: Report['completed_at']; - output: Omit | null; + output: Omit | null; }>; /* diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index b0c8978a02f6a..1bbaa406088af 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { Readable, pipeline } from 'stream'; +import { Writable, finished } from 'stream'; import { promisify } from 'util'; import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; @@ -19,6 +19,7 @@ import { TaskRunCreatorFunction, } from '../../../../task_manager/server'; import { CancellationToken } from '../../../common'; +import { ReportOutput } from '../../../common/types'; import { durationToNumber, numberToDuration } from '../../../common/schema_utils'; import { ReportingConfigType } from '../../config'; import { BasePayload, RunTaskFn } from '../../types'; @@ -40,8 +41,8 @@ interface ReportingExecuteTaskInstance { runAt?: Date; } -function isOutput(output: TaskRunResult | Error): output is TaskRunResult { - return typeof output === 'object' && (output as TaskRunResult).content != null; +function isOutput(output: any): output is TaskRunResult { + return output?.size != null; } function reportFromTask(task: ReportTaskParams) { @@ -203,12 +204,11 @@ export class ExecuteReportTask implements ReportingTask { return await store.setReportFailed(report, doc); } - private _formatOutput(output: TaskRunResult | Error): TaskRunResult { - const docOutput = {} as TaskRunResult; + private _formatOutput(output: TaskRunResult | Error): ReportOutput { + const docOutput = {} as ReportOutput; const unknownMime = null; if (isOutput(output)) { - docOutput.content = output.content; docOutput.content_type = output.content_type || unknownMime; docOutput.max_size_reached = output.max_size_reached; docOutput.csv_contains_formulas = output.csv_contains_formulas; @@ -227,7 +227,8 @@ export class ExecuteReportTask implements ReportingTask { public async _performJob( task: ReportTaskParams, - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ): Promise { if (!this.taskExecutors) { throw new Error(`Task run function factories have not been called yet!`); @@ -242,7 +243,7 @@ export class ExecuteReportTask implements ReportingTask { // run the report // if workerFn doesn't finish before timeout, call the cancellationToken and throw an error const queueTimeout = durationToNumber(this.config.queue.timeout); - return Rx.from(runner(task.id, task.payload, cancellationToken)) + return Rx.from(runner(task.id, task.payload, cancellationToken, stream)) .pipe(timeout(queueTimeout)) // throw an error if a value is not emitted before timeout .toPromise(); } @@ -253,18 +254,7 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); const completedTime = moment().toISOString(); - const { content, ...docOutput } = this._formatOutput(output); - const stream = await getContentStream(this.reporting, { - id: report._id, - index: report._index!, - if_primary_term: report._primary_term, - if_seq_no: report._seq_no, - }); - - await promisify(pipeline)(Readable.from(content ?? ''), stream); - report._seq_no = stream.getSeqNo(); - report._primary_term = stream.getPrimaryTerm(); - + const docOutput = this._formatOutput(output); const store = await this.getStore(); const doc = { completed_at: completedTime, @@ -332,7 +322,20 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`); try { - const output = await this._performJob(task, cancellationToken); + const stream = await getContentStream(this.reporting, { + id: report._id, + index: report._index!, + if_primary_term: report._primary_term, + if_seq_no: report._seq_no, + }); + const output = await this._performJob(task, cancellationToken, stream); + + stream.end(); + await promisify(finished)(stream, { readable: false }); + + report._seq_no = stream.getSeqNo(); + report._primary_term = stream.getPrimaryTerm(); + if (output) { report = await this._completeJob(report, output); } diff --git a/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts b/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts index 8d31c03c618c9..3d482d4f84d52 100644 --- a/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts +++ b/x-pack/plugins/reporting/server/routes/csv_searchsource_immediate.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import { schema } from '@kbn/config-schema'; import { KibanaRequest } from 'src/core/server'; import { ReportingCore } from '../'; @@ -67,11 +68,23 @@ export function registerGenerateCsvFromSavedObjectImmediate( const runTaskFn = runTaskFnFactory(reporting, logger); try { + let buffer = Buffer.from(''); + const stream = new Writable({ + write(chunk, encoding, callback) { + buffer = Buffer.concat([ + buffer, + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding), + ]); + callback(); + }, + }); + const { content_type: jobOutputContentType, - content: jobOutputContent, size: jobOutputSize, - }: TaskRunResult = await runTaskFn(null, req.body, context, req); + }: TaskRunResult = await runTaskFn(null, req.body, context, stream, req); + stream.end(); + const jobOutputContent = buffer.toString(); logger.info(`Job output size: ${jobOutputSize} bytes`); diff --git a/x-pack/plugins/reporting/server/types.ts b/x-pack/plugins/reporting/server/types.ts index da228b09f79d2..16cd247b4d00e 100644 --- a/x-pack/plugins/reporting/server/types.ts +++ b/x-pack/plugins/reporting/server/types.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { Writable } from 'stream'; import type { IRouter, KibanaRequest, RequestHandlerContext } from 'src/core/server'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { DataPluginStart } from 'src/plugins/data/server/plugin'; @@ -76,7 +77,8 @@ export type CreateJobFn = ( jobId: string, payload: ReportTaskParams['payload'], - cancellationToken: CancellationToken + cancellationToken: CancellationToken, + stream: Writable ) => Promise; export type CreateJobFnFactory = (