Skip to content

Commit

Permalink
Unskip Execution Context FTRs (#149070)
Browse files Browse the repository at this point in the history
Resolves #112102
  • Loading branch information
afharo authored Jan 24, 2023
1 parent bbc909b commit 4c626f1
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ describe('ExecutionContextService', () => {
`
Object {
"name": "app2",
"type": "application",
"url": "/",
}
`
Expand Down Expand Up @@ -244,6 +245,7 @@ describe('ExecutionContextService', () => {
execContext.clear();
expect(sub).toHaveBeenCalledWith({
name: 'app1',
type: 'application',
url: '/',
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class ExecutionContextService

private getDefaultContext() {
return {
type: 'application',
name: this.appId,
url: window.location.pathname,
};
Expand Down
106 changes: 96 additions & 10 deletions x-pack/test/functional_execution_context/test_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,26 @@ import Path from 'path';
import { isEqualWith } from 'lodash';
import type { Ecs, KibanaExecutionContext } from '@kbn/core/server';
import type { RetryService } from '@kbn/ftr-common-functional-services';
import { concatMap, defer, filter, firstValueFrom, ReplaySubject, scan, timeout } from 'rxjs';

export const logFilePath = Path.resolve(__dirname, './kibana.log');
export const ANY = Symbol('any');

let logstream$: ReplaySubject<Ecs> | undefined;

export function getExecutionContextFromLogRecord(record: Ecs | undefined): KibanaExecutionContext {
if (record?.log?.logger !== 'execution_context' || !record?.message) {
throw new Error(`The record is not an entry of execution context`);
}
return JSON.parse(record.message);
}

export function isExecutionContextLog(
record: string | undefined,
record: Ecs | undefined,
executionContext: KibanaExecutionContext
) {
if (!record) return false;
try {
const object = JSON.parse(record);
const object = getExecutionContextFromLogRecord(record);
return isEqualWith(object, executionContext, function customizer(obj1: any, obj2: any) {
if (obj2 === ANY) return true;
});
Expand All @@ -41,12 +50,89 @@ export async function assertLogContains({
}): Promise<void> {
// logs are written to disk asynchronously. I sacrificed performance to reduce flakiness.
await retry.waitFor(description, async () => {
const logsStr = await Fs.readFile(logFilePath, 'utf-8');
const normalizedRecords = logsStr
.split(endOfLine)
.filter(Boolean)
.map((s) => JSON.parse(s));

return normalizedRecords.some(predicate);
if (!logstream$) {
logstream$ = getLogstream$();
}
try {
await firstValueFrom(logstream$.pipe(filter(predicate), timeout(5_000)));
return true;
} catch (err) {
return false;
}
});
}

/**
* Creates an observable that continuously tails the log file.
*/
function getLogstream$(): ReplaySubject<Ecs> {
const stream$ = new ReplaySubject<Ecs>();

defer(async function* () {
const fd = await Fs.open(logFilePath, 'rs');
while (!stream$.isStopped) {
const { bytesRead, buffer } = await fd.read();
if (bytesRead) {
yield buffer.toString('utf8', 0, bytesRead);
}
}
await fd.close();
})
.pipe(
scan<string, { buffer: string; records: Ecs[] }>(
({ buffer }, chunk) => {
const logString = buffer.concat(chunk);
const lines = logString.split(endOfLine);
const lastLine = lines.pop();
const records = lines.map((s) => JSON.parse(s));

let leftover = '';
if (lastLine) {
try {
const validRecord = JSON.parse(lastLine);
records.push(validRecord);
} catch (err) {
leftover = lastLine;
}
}

return { buffer: leftover, records };
},
{
records: [], // The ECS entries in the logs
buffer: '', // Accumulated leftovers from the previous operation
}
),
concatMap(({ records }) => records)
)
.subscribe(stream$);

// let the content start flowing
stream$.subscribe();

return stream$;
}

export function closeLogstream() {
logstream$?.complete();
logstream$ = undefined;
}

/**
* Truncates the log file to avoid tests looking at the logs from previous executions.
*/
export async function clearLogFile() {
closeLogstream();
await Fs.writeFile(logFilePath, '', 'utf8');
await forceSyncLogFile();
logstream$ = getLogstream$();
}

/**
* Force the completion of all the pending I/O operations in the OS related to the log file.
*/
export async function forceSyncLogFile() {
const fileDescriptor = await Fs.open(logFilePath);
await fileDescriptor.datasync();
await fileDescriptor.close();
}
Loading

0 comments on commit 4c626f1

Please sign in to comment.