Skip to content

Commit

Permalink
Move report contents writing to the content stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dokmic committed Aug 4, 2021
1 parent 46ab6dd commit 23f4e29
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
6 changes: 3 additions & 3 deletions x-pack/plugins/reporting/server/lib/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types'
import { ElasticsearchClient } from 'src/core/server';
import { LevelLogger, statuses } from '../';
import { ReportingCore } from '../../';
import { JobStatus } from '../../../common/types';
import { JobStatus, TaskRunResult } from '../../../common/types';

import { ILM_POLICY_NAME } from '../../../common/constants';

Expand Down Expand Up @@ -41,7 +41,7 @@ export type ReportFailedFields = Required<{

export type ReportCompletedFields = Required<{
completed_at: Report['completed_at'];
output: Report['output'];
output: Omit<TaskRunResult, 'content'> | null;
}>;

/*
Expand Down Expand Up @@ -354,7 +354,7 @@ export class ReportingStore {
const doc = sourceDoc({
...completedInfo,
status,
});
} as ReportSource);

try {
checkReportIsEditable(report);
Expand Down
18 changes: 16 additions & 2 deletions x-pack/plugins/reporting/server/lib/tasks/execute_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
* 2.0.
*/

import { Readable, pipeline } from 'stream';
import { promisify } from 'util';
import { UpdateResponse } from '@elastic/elasticsearch/api/types';
import moment from 'moment';
import * as Rx from 'rxjs';
import { timeout } from 'rxjs/operators';
import { LevelLogger } from '../';
import { LevelLogger, getContentStreamFactory } from '../';
import { ReportingCore } from '../../';
import {
RunContext,
Expand Down Expand Up @@ -55,13 +57,15 @@ export class ExecuteReportTask implements ReportingTask {
private kibanaId?: string;
private kibanaName?: string;
private store?: ReportingStore;
private getContentStream: ReturnType<typeof getContentStreamFactory>;

constructor(
private reporting: ReportingCore,
private config: ReportingConfigType,
logger: LevelLogger
) {
this.logger = logger.clone(['runTask']);
this.getContentStream = getContentStreamFactory(reporting);
}

/*
Expand Down Expand Up @@ -251,7 +255,17 @@ export class ExecuteReportTask implements ReportingTask {
this.logger.debug(`Saving ${report.jobtype} to ${docId}.`);

const completedTime = moment().toISOString();
const docOutput = this._formatOutput(output);
const { content, ...docOutput } = this._formatOutput(output);
const stream = await this.getContentStream({
id: report._id,
index: report._index!,
if_primary_term: report._primary_term,
if_seq_no: report._seq_no,
});

await promisify(pipeline)(Readable.from(content ?? ''), stream);
report._seq_no = stream.getSeqNo();
report._primary_term = stream.getPrimaryTerm();

const store = await this.getStore();
const doc = {
Expand Down

0 comments on commit 23f4e29

Please sign in to comment.