From 23f4e292783b47334790404190dfd7ad8655a28a Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Wed, 28 Jul 2021 20:56:12 +0200 Subject: [PATCH] Move report contents writing to the content stream --- .../reporting/server/lib/store/store.ts | 6 +++--- .../server/lib/tasks/execute_report.ts | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index 40a73f294c5a9..c8c5f82b59039 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -9,7 +9,7 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types' import { ElasticsearchClient } from 'src/core/server'; import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; -import { JobStatus } from '../../../common/types'; +import { JobStatus, TaskRunResult } from '../../../common/types'; import { ILM_POLICY_NAME } from '../../../common/constants'; @@ -41,7 +41,7 @@ export type ReportFailedFields = Required<{ export type ReportCompletedFields = Required<{ completed_at: Report['completed_at']; - output: Report['output']; + output: Omit | null; }>; /* @@ -354,7 +354,7 @@ export class ReportingStore { const doc = sourceDoc({ ...completedInfo, status, - }); + } as ReportSource); try { checkReportIsEditable(report); diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index a52f452436d6d..1fb07547fa903 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -5,11 +5,13 @@ * 2.0. */ +import { Readable, pipeline } from 'stream'; +import { promisify } from 'util'; import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; import * as Rx from 'rxjs'; import { timeout } from 'rxjs/operators'; -import { LevelLogger } from '../'; +import { LevelLogger, getContentStreamFactory } from '../'; import { ReportingCore } from '../../'; import { RunContext, @@ -55,6 +57,7 @@ export class ExecuteReportTask implements ReportingTask { private kibanaId?: string; private kibanaName?: string; private store?: ReportingStore; + private getContentStream: ReturnType; constructor( private reporting: ReportingCore, @@ -62,6 +65,7 @@ export class ExecuteReportTask implements ReportingTask { logger: LevelLogger ) { this.logger = logger.clone(['runTask']); + this.getContentStream = getContentStreamFactory(reporting); } /* @@ -251,7 +255,17 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); const completedTime = moment().toISOString(); - const docOutput = this._formatOutput(output); + const { content, ...docOutput } = this._formatOutput(output); + const stream = await this.getContentStream({ + id: report._id, + index: report._index!, + if_primary_term: report._primary_term, + if_seq_no: report._seq_no, + }); + + await promisify(pipeline)(Readable.from(content ?? ''), stream); + report._seq_no = stream.getSeqNo(); + report._primary_term = stream.getPrimaryTerm(); const store = await this.getStore(); const doc = {