Skip to content

Commit

Permalink
Make task manager code use the same logger (elastic#192574)
Browse files Browse the repository at this point in the history
In this PR, I'm making the sub-loggers within task manager use the main
logger so we can observe the logs under
`log.logger:"plugin.taskManager"`. To preserve separation, I moved the
sub-logger name within a tag so we can still filter the logs via
`tags:"taskClaimer"`.

The wrapped_logger.ts file is copied from
`x-pack/plugins/alerting/server/task_runner/lib/task_runner_logger.ts`.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
mikecote and elasticmachine authored Sep 16, 2024
1 parent 61fcb0f commit a0973d6
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ function createRoot(settings = {}) {
name: 'plugins.taskManager',
level: 'all',
},
{
name: 'plugins.taskManager.metrics-debugger',
level: 'warn',
},
{
name: 'plugins.taskManager.metrics-subscribe-debugger',
level: 'warn',
},
],
},
},
Expand Down
98 changes: 98 additions & 0 deletions x-pack/plugins/task_manager/server/lib/wrapped_logger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { LogLevel, LogRecord } from '@kbn/logging';
import { createWrappedLogger } from './wrapped_logger';

describe('createWrappedLogger', () => {
test('should inject baseline tags into log messages', () => {
const logger: ReturnType<typeof loggingSystemMock.createLogger> =
loggingSystemMock.createLogger();
const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] });

taskRunnerLogger.trace('test trace message', { tags: ['tag-3'] });
taskRunnerLogger.debug('test debug message', { tags: ['tag-4'] });
taskRunnerLogger.info('test info message', { tags: ['tag-5'] });
taskRunnerLogger.warn('test warn message', { tags: ['tag-6'] });
taskRunnerLogger.error('test error message', { tags: ['tag-7'] });
taskRunnerLogger.fatal('test fatal message', { tags: ['tag-8'] });

expect(logger.trace).toHaveBeenCalledWith('test trace message', {
tags: ['tag-1', 'tag-2', 'tag-3'],
});
expect(logger.debug).toHaveBeenCalledWith('test debug message', {
tags: ['tag-1', 'tag-2', 'tag-4'],
});
expect(logger.info).toHaveBeenCalledWith('test info message', {
tags: ['tag-1', 'tag-2', 'tag-5'],
});
expect(logger.warn).toHaveBeenCalledWith('test warn message', {
tags: ['tag-1', 'tag-2', 'tag-6'],
});
expect(logger.error).toHaveBeenCalledWith('test error message', {
tags: ['tag-1', 'tag-2', 'tag-7'],
});
expect(logger.fatal).toHaveBeenCalledWith('test fatal message', {
tags: ['tag-1', 'tag-2', 'tag-8'],
});
});

test('should pass through other meta fields', () => {
const logger: ReturnType<typeof loggingSystemMock.createLogger> =
loggingSystemMock.createLogger();
const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] });

taskRunnerLogger.trace('test trace message', { labels: { foo: 'bar' } });
taskRunnerLogger.debug('test debug message', { tags: ['tag-4'], host: { cpu: { usage: 3 } } });
taskRunnerLogger.info('test info message');
taskRunnerLogger.warn('test warn message', { user: { email: 'abc@124.com' } });
taskRunnerLogger.error('test error message', { agent: { id: 'agent-1' } });
taskRunnerLogger.fatal('test fatal message');

expect(logger.trace).toHaveBeenCalledWith('test trace message', {
tags: ['tag-1', 'tag-2'],
labels: { foo: 'bar' },
});
expect(logger.debug).toHaveBeenCalledWith('test debug message', {
tags: ['tag-1', 'tag-2', 'tag-4'],
host: { cpu: { usage: 3 } },
});
expect(logger.info).toHaveBeenCalledWith('test info message', { tags: ['tag-1', 'tag-2'] });
expect(logger.warn).toHaveBeenCalledWith('test warn message', {
tags: ['tag-1', 'tag-2'],
user: { email: 'abc@124.com' },
});
expect(logger.error).toHaveBeenCalledWith('test error message', {
tags: ['tag-1', 'tag-2'],
agent: { id: 'agent-1' },
});
expect(logger.fatal).toHaveBeenCalledWith('test fatal message', { tags: ['tag-1', 'tag-2'] });
});

test('should pass through other functions', () => {
const logger: ReturnType<typeof loggingSystemMock.createLogger> =
loggingSystemMock.createLogger();
const taskRunnerLogger = createWrappedLogger({ logger, tags: ['tag-1', 'tag-2'] });

taskRunnerLogger.isLevelEnabled('debug');
expect(logger.isLevelEnabled).toHaveBeenCalledWith('debug');

taskRunnerLogger.get('prefix', 'another');
expect(logger.get).toHaveBeenCalledWith('prefix', 'another');

const logRecord: LogRecord = {
timestamp: new Date(),
level: LogLevel.Info,
context: 'context',
message: 'message',
pid: 1,
};
taskRunnerLogger.log(logRecord);
expect(logger.log).toHaveBeenCalledWith(logRecord);
});
});
74 changes: 74 additions & 0 deletions x-pack/plugins/task_manager/server/lib/wrapped_logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Logger, LogMeta } from '@kbn/core/server';
import { LogLevelId, LogMessageSource, LogRecord } from '@kbn/logging';

interface WrappedLoggerOpts {
logger: Logger;
tags: string[];
}

export function createWrappedLogger(opts: WrappedLoggerOpts): Logger {
return new WrappedLogger(opts);
}

class WrappedLogger implements Logger {
private loggerMetaTags: string[] = [];

constructor(private readonly opts: WrappedLoggerOpts) {
this.loggerMetaTags = opts.tags;
}

trace<Meta extends LogMeta = LogMeta>(message: LogMessageSource, meta?: Meta) {
this.opts.logger.trace(message, { ...meta, tags: this.combineTags(meta?.tags) });
}

debug<Meta extends LogMeta = LogMeta>(message: LogMessageSource, meta?: Meta) {
this.opts.logger.debug(message, { ...meta, tags: this.combineTags(meta?.tags) });
}

info<Meta extends LogMeta = LogMeta>(message: LogMessageSource, meta?: Meta) {
this.opts.logger.info(message, { ...meta, tags: this.combineTags(meta?.tags) });
}

warn<Meta extends LogMeta = LogMeta>(errorOrMessage: LogMessageSource | Error, meta?: Meta) {
this.opts.logger.warn(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) });
}

error<Meta extends LogMeta = LogMeta>(errorOrMessage: LogMessageSource | Error, meta?: Meta) {
this.opts.logger.error(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) });
}

fatal<Meta extends LogMeta = LogMeta>(errorOrMessage: LogMessageSource | Error, meta?: Meta) {
this.opts.logger.fatal(errorOrMessage, { ...meta, tags: this.combineTags(meta?.tags) });
}

log(record: LogRecord) {
this.opts.logger.log(record);
}

isLevelEnabled(level: LogLevelId): boolean {
return this.opts.logger.isLevelEnabled(level);
}

get(...childContextPaths: string[]): Logger {
return this.opts.logger.get(...childContextPaths);
}

private combineTags(tags?: string[] | string): string[] {
if (!tags) {
return this.loggerMetaTags;
}

if (typeof tags === 'string') {
return [...new Set([...this.loggerMetaTags, tags])];
}

return [...new Set([...this.loggerMetaTags, ...tags])];
}
}
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/metrics/metrics_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Logger } from '@kbn/core/server';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { createWrappedLogger } from '../lib/wrapped_logger';
import {
isTaskManagerStatEvent,
isTaskManagerMetricEvent,
Expand Down Expand Up @@ -52,7 +53,7 @@ export function createMetricsAggregators({
taskManagerMetricsCollector,
}: CreateMetricsAggregatorsOpts): AggregatedStatProvider {
const aggregators: AggregatedStatProvider[] = [];
const debugLogger = logger.get('metrics-debugger');
const debugLogger = createWrappedLogger({ logger, tags: ['metrics-debugger'] });
if (taskPollingLifecycle) {
aggregators.push(
createAggregator({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ describe('TaskClaiming', () => {
});

expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Unknown task claiming strategy "non-default", falling back to update_by_query'
'Unknown task claiming strategy "non-default", falling back to update_by_query',
{ tags: ['taskClaiming'] }
);
});

Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/queries/task_claiming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
getTaskClaimer,
} from '../task_claimers';
import { TaskPartitioner } from '../lib/task_partitioner';
import { createWrappedLogger } from '../lib/wrapped_logger';

export type { ClaimOwnershipResult } from '../task_claimers';
export interface TaskClaimingOpts {
Expand Down Expand Up @@ -107,7 +108,7 @@ export class TaskClaiming {
this.maxAttempts = opts.maxAttempts;
this.taskStore = opts.taskStore;
this.getAvailableCapacity = opts.getAvailableCapacity;
this.logger = opts.logger.get('taskClaiming');
this.logger = createWrappedLogger({ logger: opts.logger, tags: ['taskClaiming'] });
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;
Expand Down
11 changes: 1 addition & 10 deletions x-pack/plugins/task_manager/server/routes/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ const QuerySchema = schema.object({
});

export function metricsRoute(params: MetricsRouteParams) {
const { router, logger, metrics$, resetMetrics$, taskManagerId } = params;
const { router, metrics$, resetMetrics$, taskManagerId } = params;

const debugLogger = logger.get(`metrics-debugger`);
const additionalDebugLogger = logger.get(`metrics-subscribe-debugger`);
let lastMetrics: NodeMetrics | null = null;

metrics$.subscribe((metrics) => {
lastMetrics = { process_uuid: taskManagerId, timestamp: new Date().toISOString(), ...metrics };
additionalDebugLogger.debug(() => `subscribed metrics ${JSON.stringify(metrics)}`);
});

router.get(
Expand All @@ -68,12 +65,6 @@ export function metricsRoute(params: MetricsRouteParams) {
req: KibanaRequest<unknown, TypeOf<typeof QuerySchema>, unknown>,
res: KibanaResponseFactory
): Promise<IKibanaResponse> {
debugLogger.debug(
() =>
`/api/task_manager/metrics route accessed with reset=${req.query.reset} - metrics ${
lastMetrics ? JSON.stringify(lastMetrics) : 'not available'
}`
);
if (req.query.reset) {
resetMetrics$.next(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -497,7 +497,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 2;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -604,11 +604,11 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Error updating task id-2:task to mark as unrecognized during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 1;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -701,11 +701,11 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Error updating tasks to mark as unrecognized during claim: Error: Oh no',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -855,7 +855,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -948,7 +948,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -1041,7 +1041,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -1140,7 +1140,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -1271,11 +1271,11 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).toHaveBeenCalledWith(
'Error getting full task id-2:task during claim: Oh no',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -1511,11 +1511,11 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).toHaveBeenCalledWith(
'Error updating task id-2:task during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
Expand Down Expand Up @@ -1644,7 +1644,7 @@ describe('TaskClaiming', () => {

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).not.toHaveBeenCalled();

Expand Down Expand Up @@ -2000,7 +2000,8 @@ describe('TaskClaiming', () => {
] = claimedResults;

expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Background task node "test" has no assigned partitions, claiming against all partitions'
'Background task node "test" has no assigned partitions, claiming against all partitions',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(query).toMatchInlineSnapshot(`
Object {
Expand Down
Loading

0 comments on commit a0973d6

Please sign in to comment.