Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] multi-process Kibana with Node clustering #106055

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kbn-logging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

export { LogLevel, LogLevelId } from './log_level';
export { LogRecord } from './log_record';
export { LogRecord, SerializableLogRecord } from './log_record';
export { Logger } from './logger';
export { LogMeta } from './log_meta';
export { LoggerFactory } from './logger_factory';
Expand Down
16 changes: 15 additions & 1 deletion packages/kbn-logging/src/log_record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

import { LogLevel } from './log_level';
import { LogLevel, LogLevelId } from './log_level';

/**
* Essential parts of every log message.
Expand All @@ -21,3 +21,17 @@ export interface LogRecord {
meta?: { [name: string]: any };
pid: number;
}

/**
* Serializable version of a log record, used for IPC.
* @internal
*/
export interface SerializableLogRecord {
timestamp: string; // ISO-8601 timestamp
level: LogLevelId;
context: string;
message: string;
error?: { name: string; message: string; stack?: string };
meta?: { [name: string]: any };
pid: number;
}
47 changes: 29 additions & 18 deletions src/core/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

import chalk from 'chalk';
import { CliArgs, Env, RawConfigService } from './config';
import { Root } from './root';
import { CriticalError } from './errors';
import { getNodeInfo } from './node';
import { KibanaCoordinator } from './root/coordinator';
import { KibanaWorker } from './root/worker';
import { KibanaRoot } from './root/types';

interface BootstrapArgs {
configs: string[];
Expand Down Expand Up @@ -37,27 +40,40 @@ export async function bootstrap({ configs, cliArgs, applyConfigOverrides }: Boot
const { REPO_ROOT } = require('@kbn/utils');

const env = Env.createDefault(REPO_ROOT, {
// TODO: do we want to add nodeInfo to Env ?
configs,
cliArgs,
});

const rawConfigService = new RawConfigService(env.configs, applyConfigOverrides);
rawConfigService.loadConfig();

const root = new Root(rawConfigService, env, onRootShutdown);
const nodeInfo = await getNodeInfo(rawConfigService);

process.on('SIGHUP', () => reloadConfiguration());
let root: KibanaRoot;
if (nodeInfo.isCoordinator) {
root = new KibanaCoordinator(rawConfigService, env, nodeInfo, onRootShutdown);
} else {
root = new KibanaWorker(rawConfigService, env, nodeInfo, onRootShutdown);
}

// This is only used by the LogRotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (!msg || msg.reloadConfiguration !== true) {
return;
}
if (nodeInfo.isMaster) {
process.on('SIGHUP', () => reloadConfiguration());

reloadConfiguration();
});
// this is only used by the logrotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (!msg || msg.reloadConfiguration !== true) {
return;
}

reloadConfiguration();
});

process.on('SIGINT', () => root.shutdown());
process.on('SIGTERM', () => root.shutdown());
}

function reloadConfiguration() {
const cliLogger = root.logger.get('cli');
Expand All @@ -66,15 +82,12 @@ export async function bootstrap({ configs, cliArgs, applyConfigOverrides }: Boot
try {
rawConfigService.reloadConfig();
} catch (err) {
return shutdown(err);
shutdown(err);
}

cliLogger.info('Reloaded Kibana configuration due to SIGHUP.', { tags: ['config'] });
}

process.on('SIGINT', () => shutdown());
process.on('SIGTERM', () => shutdown());

function shutdown(reason?: Error) {
rawConfigService.stop();
return root.shutdown(reason);
Expand All @@ -100,9 +113,7 @@ function onRootShutdown(reason?: any) {
// mirror such fatal errors in standard output with `console.error`.
// eslint-disable-next-line
console.error(`\n${chalk.white.bgRed(' FATAL ')} ${reason}\n`);

process.exit(reason instanceof CriticalError ? reason.processExitCode : 1);
}

process.exit(0);
}
1 change: 1 addition & 0 deletions src/core/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

export { coreDeprecationProvider } from './deprecation';
export { ensureValidConfiguration } from './ensure_valid_configuration';
export { setupCoreConfig } from './setup_core_config';

export {
ConfigService,
Expand Down
63 changes: 63 additions & 0 deletions src/core/server/config/setup_core_config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { ConfigService } from '@kbn/config';
import { config as pathConfig } from '@kbn/utils';

import { config as pluginsConfig } from '../plugins';
import { opsConfig } from '../metrics';
import { config as pidConfig } from '../environment';
import { nodeConfig } from '../node';

import { config as cspConfig } from '../csp';
import { config as elasticsearchConfig } from '../elasticsearch';
import { config as httpConfig } from '../http';
import { config as loggingConfig } from '../logging';
import { config as kibanaConfig } from '../kibana_config';
import { savedObjectsConfig, savedObjectsMigrationConfig } from '../saved_objects';
import { config as uiSettingsConfig } from '../ui_settings';
import { config as statusConfig } from '../status';
import { config as i18nConfig } from '../i18n';
import { ServiceConfigDescriptor } from '../internal_types';
import { config as externalUrlConfig } from '../external_url';

import { coreDeprecationProvider } from './deprecation';

const rootConfigPath = '';

// TODO: This is copied from Server, need to dedupe.
// We probably don't need to do this in the Server if
// it's already handled in the coordinator.
export function setupCoreConfig(configService: ConfigService) {
const configDescriptors: Array<ServiceConfigDescriptor<unknown>> = [
pathConfig,
cspConfig,
elasticsearchConfig,
externalUrlConfig,
loggingConfig,
httpConfig,
pluginsConfig,
kibanaConfig,
savedObjectsConfig,
savedObjectsMigrationConfig,
uiSettingsConfig,
opsConfig,
statusConfig,
pidConfig,
i18nConfig,
nodeConfig,
];

configService.addDeprecationProvider(rootConfigPath, coreDeprecationProvider);
for (const descriptor of configDescriptors) {
if (descriptor.deprecations) {
configService.addDeprecationProvider(descriptor.path, descriptor.deprecations);
}
configService.setSchema(descriptor.path, descriptor.schema);
}
}
2 changes: 2 additions & 0 deletions src/core/server/internal_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { InternalRenderingServiceSetup } from './rendering';
import { InternalHttpResourcesSetup } from './http_resources';
import { InternalStatusServiceSetup } from './status';
import { InternalLoggingServiceSetup } from './logging';
import { NodeServiceSetup } from './node';
import { CoreUsageDataStart } from './core_usage_data';
import { I18nServiceSetup } from './i18n';
import { InternalDeprecationsServiceSetup } from './deprecations';
Expand All @@ -46,6 +47,7 @@ export interface InternalCoreSetup {
httpResources: InternalHttpResourcesSetup;
logging: InternalLoggingServiceSetup;
metrics: InternalMetricsServiceSetup;
node: NodeServiceSetup;
deprecations: InternalDeprecationsServiceSetup;
}

Expand Down
74 changes: 70 additions & 4 deletions src/core/server/logging/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
* Side Public License, v 1.
*/

import { Appender, LogLevel, LogRecord, LoggerFactory, LogMeta, Logger } from '@kbn/logging';
import {
Appender,
LogLevel,
LogRecord,
LoggerFactory,
LogMeta,
Logger,
SerializableLogRecord,
} from '@kbn/logging';
import { NodeInfo } from '../node';

function isError(x: any): x is Error {
return x instanceof Error;
Expand All @@ -18,7 +27,8 @@ export class BaseLogger implements Logger {
private readonly context: string,
private readonly level: LogLevel,
private readonly appenders: Appender[],
private readonly factory: LoggerFactory
private readonly factory: LoggerFactory,
private readonly nodeInfo: NodeInfo
) {}

public trace<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta): void {
Expand Down Expand Up @@ -50,15 +60,31 @@ export class BaseLogger implements Logger {
return;
}

for (const appender of this.appenders) {
appender.append(record);
if (this.nodeInfo.isEnabled && !this.nodeInfo.isCoordinator) {
// If we are in clustering mode, and this is a worker,
// send to the coordinator instead of appending.
process.send!({
_kind: 'kibana-log-record',
payload: BaseLogger.toSerializableLogRecord(record),
workerId: this.nodeInfo.workerId,
});
} else {
// We are the coordinator or in non-clustering mode,
// so just append the record.
this.append(record);
}
}

public get(...childContextPaths: string[]): Logger {
return this.factory.get(...[this.context, ...childContextPaths]);
}

private append(record: LogRecord) {
for (const appender of this.appenders) {
appender.append(record);
}
}

private createLogRecord<Meta extends LogMeta>(
level: LogLevel,
errorOrMessage: string | Error,
Expand All @@ -85,4 +111,44 @@ export class BaseLogger implements Logger {
pid: process.pid,
};
}

/**
* If clustering is enabled, we need to be able to serialize a LogRecord
* so that it can be sent to the coordinator process via IPC.
*
* Converting each record to and from the serializable format is potentially
* expensive; ideally we'd consider changing the `LogRecord` interface into
* something fully serializable. Would need to assess how much impact this
* could have on the rest of Kibana.
*/
public static toSerializableLogRecord(record: LogRecord): SerializableLogRecord {
return {
...record, // TODO: shallow clone might not be necessary
level: record.level.id,
timestamp:
typeof record.timestamp === 'string' ? record.timestamp : record.timestamp.toISOString(),
error: record.error
? {
message: record.error.message,
name: record.error.name,
stack: record.error.stack,
}
: undefined,
};
}

public static fromSerializableLogRecord(record: SerializableLogRecord): LogRecord {
const error = record.error ? new Error(record.error.message) : undefined;
if (error) {
error.name = record.error!.name;
error.stack = record.error?.stack;
}

return {
...record, // TODO: shallow clone might not be necessary
level: LogLevel.fromId(record.level),
timestamp: new Date(record.timestamp),
error,
};
}
}
11 changes: 11 additions & 0 deletions src/core/server/logging/logging_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ export interface LoggerContextConfigInput {
* @internal
*/
export class LoggingConfig {
/**
* Helper method that takes a single context string and splits it into separated context parts.
* In case the provided context is an empty string, `root` context name is returned.
* @param context The context string (e.g. 'parent.child').
* @returns {string[]} Separated context parts (e.g. ['parent', 'child']).
*/
public static getLoggerContextParts(context: string) {
const parts = context.split(CONTEXT_SEPARATOR);
return parts.length ? parts : [ROOT_CONTEXT_NAME];
}

/**
* Helper method that joins separate string context parts into single context string.
* In case joined context is an empty string, `root` context name is returned.
Expand Down
19 changes: 16 additions & 3 deletions src/core/server/logging/logging_system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import type { PublicMethodsOf } from '@kbn/utility-types';
import { DisposableAppender, LogLevel, Logger, LoggerFactory } from '@kbn/logging';
import { NodeInfo } from '../node';
import { Appenders } from './appenders/appenders';
import { BufferAppender } from './appenders/buffer/buffer_appender';
import { BaseLogger } from './logger';
Expand Down Expand Up @@ -38,7 +39,7 @@ export class LoggingSystem implements LoggerFactory {
private readonly loggers: Map<string, LoggerAdapter> = new Map();
private readonly contextConfigs = new Map<string, LoggerContextConfigType>();

constructor() {}
constructor(private readonly nodeInfo: NodeInfo) {}

public get(...contextParts: string[]): Logger {
const context = LoggingConfig.getLoggerContext(contextParts);
Expand Down Expand Up @@ -124,14 +125,26 @@ export class LoggingSystem implements LoggerFactory {
if (config === undefined) {
// If we don't have config yet, use `buffered` appender that will store all logged messages in the memory
// until the config is ready.
return new BaseLogger(context, LogLevel.All, [this.bufferAppender], this.asLoggerFactory());
return new BaseLogger(
context,
LogLevel.All,
[this.bufferAppender],
this.asLoggerFactory(),
this.nodeInfo
);
}

const { level, appenders } = this.getLoggerConfigByContext(config, context);
const loggerLevel = LogLevel.fromId(level);
const loggerAppenders = appenders.map((appenderKey) => this.appenders.get(appenderKey)!);

return new BaseLogger(context, loggerLevel, loggerAppenders, this.asLoggerFactory());
return new BaseLogger(
context,
loggerLevel,
loggerAppenders,
this.asLoggerFactory(),
this.nodeInfo
);
}

private getLoggerConfigByContext(config: LoggingConfig, context: string): LoggerConfigType {
Expand Down
Loading