Skip to content

Commit

Permalink
Send worker log messages to coordinator for appending.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeelmers committed Jul 19, 2021
1 parent 2aa578b commit 5368bf7
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/kbn-logging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

export { LogLevel, LogLevelId } from './log_level';
export { LogRecord } from './log_record';
export { LogRecord, SerializableLogRecord } from './log_record';
export { Logger } from './logger';
export { LogMeta } from './log_meta';
export { LoggerFactory } from './logger_factory';
Expand Down
16 changes: 15 additions & 1 deletion packages/kbn-logging/src/log_record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

import { LogLevel } from './log_level';
import { LogLevel, LogLevelId } from './log_level';

/**
* Essential parts of every log message.
Expand All @@ -21,3 +21,17 @@ export interface LogRecord {
meta?: { [name: string]: any };
pid: number;
}

/**
* Serializable version of a log record, used for IPC.
* @internal
*/
export interface SerializableLogRecord {
timestamp: string; // ISO-8601 timestamp
level: LogLevelId;
context: string;
message: string;
error?: { name: string; message: string; stack?: string };
meta?: { [name: string]: any };
pid: number;
}
4 changes: 3 additions & 1 deletion src/core/server/config/setup_core_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import { coreDeprecationProvider } from './deprecation';

const rootConfigPath = '';

// TODO: This is copied from Server, need to dedupe
// TODO: This is copied from Server, need to dedupe.
// We probably don't need to do this in the Server if
// it's already handled in the coordinator.
export function setupCoreConfig(configService: ConfigService) {
const configDescriptors: Array<ServiceConfigDescriptor<unknown>> = [
pathConfig,
Expand Down
74 changes: 70 additions & 4 deletions src/core/server/logging/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
* Side Public License, v 1.
*/

import { Appender, LogLevel, LogRecord, LoggerFactory, LogMeta, Logger } from '@kbn/logging';
import {
Appender,
LogLevel,
LogRecord,
LoggerFactory,
LogMeta,
Logger,
SerializableLogRecord,
} from '@kbn/logging';
import { NodeInfo } from '../node';

function isError(x: any): x is Error {
return x instanceof Error;
Expand All @@ -18,7 +27,8 @@ export class BaseLogger implements Logger {
private readonly context: string,
private readonly level: LogLevel,
private readonly appenders: Appender[],
private readonly factory: LoggerFactory
private readonly factory: LoggerFactory,
private readonly nodeInfo: NodeInfo
) {}

public trace<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta): void {
Expand Down Expand Up @@ -50,15 +60,31 @@ export class BaseLogger implements Logger {
return;
}

for (const appender of this.appenders) {
appender.append(record);
if (this.nodeInfo.isEnabled && !this.nodeInfo.isCoordinator) {
// If we are in clustering mode, and this is a worker,
// send to the coordinator instead of appending.
process.send!({
_kind: 'kibana-log-record',
payload: BaseLogger.toSerializableLogRecord(record),
workerId: this.nodeInfo.workerId,
});
} else {
// We are the coordinator or in non-clustering mode,
// so just append the record.
this.append(record);
}
}

public get(...childContextPaths: string[]): Logger {
return this.factory.get(...[this.context, ...childContextPaths]);
}

private append(record: LogRecord) {
for (const appender of this.appenders) {
appender.append(record);
}
}

private createLogRecord<Meta extends LogMeta>(
level: LogLevel,
errorOrMessage: string | Error,
Expand All @@ -85,4 +111,44 @@ export class BaseLogger implements Logger {
pid: process.pid,
};
}

/**
* If clustering is enabled, we need to be able to serialize a LogRecord
* so that it can be sent to the coordinator process via IPC.
*
* Converting each record to and from the serializable format is potentially
* expensive; ideally we'd consider changing the `LogRecord` interface into
* something fully serializable. Would need to assess how much impact this
* could have on the rest of Kibana.
*/
public static toSerializableLogRecord(record: LogRecord): SerializableLogRecord {
return {
...record, // TODO: shallow clone might not be necessary
level: record.level.id,
timestamp:
typeof record.timestamp === 'string' ? record.timestamp : record.timestamp.toISOString(),
error: record.error
? {
message: record.error.message,
name: record.error.name,
stack: record.error.stack,
}
: undefined,
};
}

public static fromSerializableLogRecord(record: SerializableLogRecord): LogRecord {
const error = record.error ? new Error(record.error.message) : undefined;
if (error) {
error.name = record.error!.name;
error.stack = record.error?.stack;
}

return {
...record, // TODO: shallow clone might not be necessary
level: LogLevel.fromId(record.level),
timestamp: new Date(record.timestamp),
error,
};
}
}
11 changes: 11 additions & 0 deletions src/core/server/logging/logging_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ export interface LoggerContextConfigInput {
* @internal
*/
export class LoggingConfig {
/**
* Helper method that takes a single context string and splits it into separated context parts.
* In case the provided context is an empty string, `root` context name is returned.
* @param context The context string (e.g. 'parent.child').
* @returns {string[]} Separated context parts (e.g. ['parent', 'child']).
*/
public static getLoggerContextParts(context: string) {
const parts = context.split(CONTEXT_SEPARATOR);
return parts.length ? parts : [ROOT_CONTEXT_NAME];
}

/**
* Helper method that joins separate string context parts into single context string.
* In case joined context is an empty string, `root` context name is returned.
Expand Down
19 changes: 16 additions & 3 deletions src/core/server/logging/logging_system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import type { PublicMethodsOf } from '@kbn/utility-types';
import { DisposableAppender, LogLevel, Logger, LoggerFactory } from '@kbn/logging';
import { NodeInfo } from '../node';
import { Appenders } from './appenders/appenders';
import { BufferAppender } from './appenders/buffer/buffer_appender';
import { BaseLogger } from './logger';
Expand Down Expand Up @@ -38,7 +39,7 @@ export class LoggingSystem implements LoggerFactory {
private readonly loggers: Map<string, LoggerAdapter> = new Map();
private readonly contextConfigs = new Map<string, LoggerContextConfigType>();

constructor() {}
constructor(private readonly nodeInfo: NodeInfo) {}

public get(...contextParts: string[]): Logger {
const context = LoggingConfig.getLoggerContext(contextParts);
Expand Down Expand Up @@ -124,14 +125,26 @@ export class LoggingSystem implements LoggerFactory {
if (config === undefined) {
// If we don't have config yet, use `buffered` appender that will store all logged messages in the memory
// until the config is ready.
return new BaseLogger(context, LogLevel.All, [this.bufferAppender], this.asLoggerFactory());
return new BaseLogger(
context,
LogLevel.All,
[this.bufferAppender],
this.asLoggerFactory(),
this.nodeInfo
);
}

const { level, appenders } = this.getLoggerConfigByContext(config, context);
const loggerLevel = LogLevel.fromId(level);
const loggerAppenders = appenders.map((appenderKey) => this.appenders.get(appenderKey)!);

return new BaseLogger(context, loggerLevel, loggerAppenders, this.asLoggerFactory());
return new BaseLogger(
context,
loggerLevel,
loggerAppenders,
this.asLoggerFactory(),
this.nodeInfo
);
}

private getLoggerConfigByContext(config: LoggingConfig, context: string): LoggerConfigType {
Expand Down
16 changes: 13 additions & 3 deletions src/core/server/node/node_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@

import cluster from 'cluster';
import { omit } from 'lodash';
import { Logger } from '@kbn/logging';
import { Logger, LoggerFactory } from '@kbn/logging';
import { ConfigService } from '../config';
import { BaseLogger } from '../logging/logger';
import { LoggingConfig } from '../logging/logging_config';
import {
NodeConfigType,
WorkersConfigType,
WorkerConfig,
config as nodeConfig,
} from './node_config';
import { TransferBroadcastMessage } from './types';
import { isBroadcastMessage } from './utils';
import { isBroadcastMessage, isLogRecordMessage } from './utils';

/**
* Coordinator-side node clustering service
*/
export class NodeManager {
private config?: NodeConfigType;

constructor(private readonly configService: ConfigService, private readonly log: Logger) {}
constructor(
private readonly configService: ConfigService,
private readonly logger: LoggerFactory,
private readonly log: Logger
) {}

public async setup() {
this.config = this.configService.atPathSync<NodeConfigType>(nodeConfig.path);
Expand Down Expand Up @@ -56,6 +62,10 @@ export class NodeManager {
const handleWorkerMessage = (workerId: number, message: any) => {
if (isBroadcastMessage(message)) {
this.broadcast(message, workerId);
} else if (isLogRecordMessage(message)) {
const context = LoggingConfig.getLoggerContextParts(message.payload.context);
const log = this.logger.get(...context);
log.log(BaseLogger.fromSerializableLogRecord(message.payload));
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/core/server/node/node_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class NodeService {
private readonly messageHandlers = new Map<string, MessageHandler[]>();

constructor(coreContext: CoreContext) {
this.log = coreContext.logger.get('node');
this.log = coreContext.logger.get('node', 'service');
this.configService = coreContext.configService;
}

Expand Down
8 changes: 8 additions & 0 deletions src/core/server/node/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { SerializableLogRecord } from '@kbn/logging';
import { Serializable } from '../../types';

/** @public */
Expand All @@ -18,6 +19,7 @@ export interface BroadcastOptions {
*/
sendToSelf?: boolean;
}

/** @internal */
export interface TransferBroadcastMessage {
_kind: 'kibana-broadcast';
Expand All @@ -26,6 +28,12 @@ export interface TransferBroadcastMessage {
options?: BroadcastOptions;
}

/** @internal */
export interface LogRecordMessage {
_kind: 'kibana-log-record';
payload: SerializableLogRecord;
}

/** @public */
export type MessageHandler = (payload: NodeMessagePayload | undefined) => void;

Expand Down
6 changes: 5 additions & 1 deletion src/core/server/node/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
* Side Public License, v 1.
*/

import { TransferBroadcastMessage } from './types';
import { LogRecordMessage, TransferBroadcastMessage } from './types';

export const isBroadcastMessage = (message: any): message is TransferBroadcastMessage => {
return message._kind === 'kibana-broadcast';
};

export const isLogRecordMessage = (message: any): message is LogRecordMessage => {
return message._kind === 'kibana-log-record';
};
4 changes: 2 additions & 2 deletions src/core/server/root/coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ export class KibanaCoordinator implements KibanaRoot {
private readonly nodeInfo: NodeInfo,
private readonly onShutdown?: (reason?: Error | string) => void
) {
this.loggingSystem = new LoggingSystem();
this.loggingSystem = new LoggingSystem(this.nodeInfo);
this.logger = this.loggingSystem.asLoggerFactory();
this.configService = new ConfigService(rawConfigProvider, this.env, this.logger);

this.log = this.logger.get('node', 'coordinator');
this.nodeManager = new NodeManager(this.configService, this.log);
this.nodeManager = new NodeManager(this.configService, this.logger, this.log);
}

async setup() {
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/root/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class KibanaWorker implements KibanaRoot {
private readonly nodeInfo: NodeInfo,
private readonly onShutdown?: (reason?: Error | string) => void
) {
this.loggingSystem = new LoggingSystem();
this.loggingSystem = new LoggingSystem(this.nodeInfo);
this.logger = this.loggingSystem.asLoggerFactory();
this.log = this.logger.get('node', 'worker');
this.server = new Server(this.rawConfigService, this.env, this.loggingSystem);
Expand Down

0 comments on commit 5368bf7

Please sign in to comment.