Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] enable NodeJS clustering on Kibana #93380

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions src/core/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

import chalk from 'chalk';
import { CliArgs, Env, RawConfigService } from './config';
import { Root } from './root';
import { CriticalError } from './errors';
import { getClusteringInfo } from './clustering';
import { KibanaCoordinator } from './root/coordinator';
import { KibanaWorker } from './root/worker';
import { KibanaRoot } from './root/types';

interface KibanaFeatures {
// Indicates whether we can run Kibana in dev mode in which Kibana is run as
Expand Down Expand Up @@ -49,46 +52,43 @@ export async function bootstrap({
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { REPO_ROOT } = require('@kbn/utils');

const rawConfigService = new RawConfigService(configs, applyConfigOverrides);
rawConfigService.loadConfig();

const clusterInfo = await getClusteringInfo(rawConfigService);
const isDevCliParent =
cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild;

const env = Env.createDefault(REPO_ROOT, {
// TODO: do we want to add clusterInfo to Env ?
configs,
cliArgs,
isDevCliParent: cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild,
isDevCliParent,
});

const rawConfigService = new RawConfigService(env.configs, applyConfigOverrides);
rawConfigService.loadConfig();

const root = new Root(rawConfigService, env, onRootShutdown);

process.on('SIGHUP', () => reloadLoggingConfig());

// This is only used by the LogRotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (!msg || msg.reloadLoggingConfig !== true) {
return;
}

reloadLoggingConfig();
});
let root: KibanaRoot;
if (clusterInfo.isCoordinator && !isDevCliParent) {
root = new KibanaCoordinator(rawConfigService, env, clusterInfo, onRootShutdown);
} else {
root = new KibanaWorker(rawConfigService, env, clusterInfo, onRootShutdown);
}

function reloadLoggingConfig() {
const cliLogger = root.logger.get('cli');
cliLogger.info('Reloading logging configuration due to SIGHUP.', { tags: ['config'] });
if (clusterInfo.isMaster) {
process.on('SIGHUP', () => root.reloadLoggingConfig());

try {
rawConfigService.reloadConfig();
} catch (err) {
return shutdown(err);
}
// This is only used by the legacy LogRotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (msg?.reloadLoggingConfig === true) {
root.reloadLoggingConfig();
}
});

cliLogger.info('Reloaded logging configuration due to SIGHUP.', { tags: ['config'] });
process.on('SIGINT', () => root.shutdown());
process.on('SIGTERM', () => root.shutdown());
}

process.on('SIGINT', () => shutdown());
process.on('SIGTERM', () => shutdown());

function shutdown(reason?: Error) {
rawConfigService.stop();
return root.shutdown(reason);
Expand All @@ -109,9 +109,7 @@ function onRootShutdown(reason?: any) {
// mirror such fatal errors in standard output with `console.error`.
// eslint-disable-next-line
console.error(`\n${chalk.white.bgRed(' FATAL ')} ${reason}\n`);

process.exit(reason instanceof CriticalError ? reason.processExitCode : 1);
}

process.exit(0);
}
110 changes: 110 additions & 0 deletions src/core/server/clustering/cluster_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { Logger, LoggerFactory } from '@kbn/logging';
import { ConfigService } from '../config';
import { ClusteringConfigType, config as clusteringConfig } from './clustering_config';
import { TransferBroadcastMessage } from './types';
import { isBroadcastMessage } from './utils';

/**
* Coordinator-side clustering service
*/
export class ClusterManager {
private config?: ClusteringConfigType;
private readonly logger: Logger;

constructor(private readonly configService: ConfigService, logger: LoggerFactory) {
this.logger = logger.get('cluster-manager');
}

public async setup() {
this.config = this.configService.atPathSync<ClusteringConfigType>(clusteringConfig.path);
if (this.config.enabled && cluster.isMaster) {
this.forkWorkers();
}
}

public async stopWorkers() {
try {
await shutdownWorkers();
} catch (e) {
await killWorkers('SIGTERM');
await killWorkers('SIGKILL');
}
}

public broadcast(message: TransferBroadcastMessage, sender?: number) {
const sendToSelf = message.options?.sendToSelf ?? false;
Object.values(cluster.workers).forEach((worker) => {
if (sendToSelf || worker?.id !== sender) {
worker?.send(message);
}
});
}

private forkWorkers() {
const handleWorkerMessage = (workerId: number, message: any) => {
if (isBroadcastMessage(message)) {
this.broadcast(message, workerId);
}
};

const createWorker = () => {
const worker = cluster.fork({});
worker.on('message', (message: any) => {
handleWorkerMessage(worker.id, message);
});
};

cluster.on('online', (worker) => {
this.logger.info(`*** Worker online: ${worker.id}`);
});

cluster.on('exit', (worker, code) => {
if (worker.exitedAfterDisconnect || code === 0) {
// shutting down
} else if (true /* closing */) {
// died while closing
} else {
// died needs restart:
createWorker();
}
});

for (let i = 0; i < this.config!.workers; i++) {
createWorker();
}
}
}

async function shutdownWorkers() {
const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead());
return Promise.all(
workers.map((worker) => {
return new Promise((resolve, reject) => {
worker!.once('exit', (code) => code !== 0 && reject());
worker!.once('disconnect', () => resolve(void 0));
worker!.send({ type: 'shutdown-worker' });
});
})
);
}

async function killWorkers(signal: 'SIGTERM' | 'SIGKILL') {
const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead());
return Promise.all(
workers.map((worker) => {
return new Promise((resolve) => {
worker!.once('exit', () => resolve(void 0));
worker!.process.kill(signal);
});
})
);
}
19 changes: 19 additions & 0 deletions src/core/server/clustering/clustering_config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { schema, TypeOf } from '@kbn/config-schema';

export const config = {
path: 'clustering',
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
workers: schema.number({ defaultValue: 2 }),
}),
};

export type ClusteringConfigType = TypeOf<typeof config.schema>;
93 changes: 93 additions & 0 deletions src/core/server/clustering/clustering_service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { take } from 'rxjs/operators';
import { Logger } from '@kbn/logging';
import { IConfigService } from '@kbn/config';
import { CoreContext } from '../core_context';
import { config as clusteringConfig, ClusteringConfigType } from './clustering_config';
import {
ClusterMessagePayload,
BroadcastOptions,
TransferBroadcastMessage,
MessageHandler,
MessageHandlerUnsubscribeFn,
} from './types';
import { isBroadcastMessage } from './utils';

/**
* @public
*/
export interface ClusteringServiceSetup {
isEnabled: () => boolean;
getWorkerId: () => number;
broadcast: (type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) => void;
addMessageHandler: (type: string, handler: MessageHandler) => MessageHandlerUnsubscribeFn;
// TODO: isMainWorker
}

/**
* Worker-side clustering service
*/
export class ClusteringService {
private readonly log: Logger;
private readonly configService: IConfigService;
private readonly messageHandlers = new Map<string, MessageHandler[]>();

constructor(coreContext: CoreContext) {
this.log = coreContext.logger.get('clustering');
this.configService = coreContext.configService;
}

public async setup(): Promise<ClusteringServiceSetup> {
const config = await this.configService
.atPath<ClusteringConfigType>(clusteringConfig.path)
.pipe(take(1))
.toPromise();
const enabled = config.enabled && cluster.isWorker;

process.on('message', (message) => {
if (isBroadcastMessage(message)) {
this.handleMessage(message);
}
});

return {
isEnabled: () => enabled,
getWorkerId: () => (enabled ? cluster.worker.id : -1),
broadcast: (type, payload, options) => this.broadcast(type, payload, options),
addMessageHandler: (type, handler) => {
this.messageHandlers.set(type, [...(this.messageHandlers.get(type) || []), handler]);
return () => {
this.messageHandlers.set(
type,
this.messageHandlers.get(type)!.filter((h) => h !== handler)
);
};
},
};
}

private handleMessage(message: TransferBroadcastMessage) {
this.log.debug(`Received message of type ${message.type}`);
const handlers = this.messageHandlers.get(message.type) || [];
handlers.forEach((handler) => {
handler(message.payload);
});
}

private broadcast(type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) {
process.send!({
_kind: 'kibana-broadcast',
type,
payload,
options,
});
}
}
46 changes: 46 additions & 0 deletions src/core/server/clustering/get_clustering_info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { RawConfigService } from '../config';
import { loadClusteringConfig } from './load_config';

/**
* @internal
*/
export interface ClusteringInfo {
/** is clustering enabled */
isEnabled: boolean;
/**
* Is the current process the master process.
* Will be true either when clustering is disabled or when the process is the coordinator
*/
isMaster: boolean;
/**
* Is the current process the coordinator process
* (master process when clustering is enabled)
*/
isCoordinator: boolean;
/** */
isWorker: boolean;
/** */
workerId: number;
}

export const getClusteringInfo = async (
rawConfigService: RawConfigService
): Promise<ClusteringInfo> => {
const config = await loadClusteringConfig(rawConfigService);
return {
isEnabled: config.enabled,
isMaster: cluster.isMaster,
isCoordinator: config.enabled && cluster.isMaster,
isWorker: cluster.isWorker,
workerId: cluster.isWorker ? cluster.worker.id : -1,
};
};
12 changes: 12 additions & 0 deletions src/core/server/clustering/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { getClusteringInfo, ClusteringInfo } from './get_clustering_info';
export { ClusteringService } from './clustering_service';
export { ClusterManager } from './cluster_manager';
export { config as clusteringConfig } from './clustering_config';
Loading