Skip to content

Commit

Permalink
first clustering draft
Browse files Browse the repository at this point in the history
  • Loading branch information
pgayvallet committed Mar 3, 2021
1 parent 5afe844 commit d0b619d
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 33 deletions.
64 changes: 31 additions & 33 deletions src/core/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

import chalk from 'chalk';
import { CliArgs, Env, RawConfigService } from './config';
import { Root } from './root';
import { CriticalError } from './errors';
import { getClusteringInfo } from './clustering';
import { KibanaCoordinator } from './root/coordinator';
import { KibanaWorker } from './root/worker';
import { KibanaRoot } from './root/types';

interface KibanaFeatures {
// Indicates whether we can run Kibana in dev mode in which Kibana is run as
Expand Down Expand Up @@ -49,46 +52,43 @@ export async function bootstrap({
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { REPO_ROOT } = require('@kbn/utils');

const rawConfigService = new RawConfigService(configs, applyConfigOverrides);
rawConfigService.loadConfig();

const clusterInfo = await getClusteringInfo(rawConfigService);
const isDevCliParent =
cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild;

const env = Env.createDefault(REPO_ROOT, {
// TODO: do we want to add clusterInfo to Env ?
configs,
cliArgs,
isDevCliParent: cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild,
isDevCliParent,
});

const rawConfigService = new RawConfigService(env.configs, applyConfigOverrides);
rawConfigService.loadConfig();

const root = new Root(rawConfigService, env, onRootShutdown);

process.on('SIGHUP', () => reloadLoggingConfig());

// This is only used by the LogRotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (!msg || msg.reloadLoggingConfig !== true) {
return;
}

reloadLoggingConfig();
});
let root: KibanaRoot;
if (clusterInfo.isCoordinator && !isDevCliParent) {
root = new KibanaCoordinator(rawConfigService, env, clusterInfo, onRootShutdown);
} else {
root = new KibanaWorker(rawConfigService, env, clusterInfo, onRootShutdown);
}

function reloadLoggingConfig() {
const cliLogger = root.logger.get('cli');
cliLogger.info('Reloading logging configuration due to SIGHUP.', { tags: ['config'] });
if (clusterInfo.isMaster) {
process.on('SIGHUP', () => root.reloadLoggingConfig());

try {
rawConfigService.reloadConfig();
} catch (err) {
return shutdown(err);
}
// This is only used by the legacy LogRotator service
// in order to be able to reload the log configuration
// under the cluster mode
process.on('message', (msg) => {
if (msg?.reloadLoggingConfig === true) {
root.reloadLoggingConfig();
}
});

cliLogger.info('Reloaded logging configuration due to SIGHUP.', { tags: ['config'] });
process.on('SIGINT', () => root.shutdown());
process.on('SIGTERM', () => root.shutdown());
}

process.on('SIGINT', () => shutdown());
process.on('SIGTERM', () => shutdown());

function shutdown(reason?: Error) {
rawConfigService.stop();
return root.shutdown(reason);
Expand All @@ -109,9 +109,7 @@ function onRootShutdown(reason?: any) {
// mirror such fatal errors in standard output with `console.error`.
// eslint-disable-next-line
console.error(`\n${chalk.white.bgRed(' FATAL ')} ${reason}\n`);

process.exit(reason instanceof CriticalError ? reason.processExitCode : 1);
}

process.exit(0);
}
110 changes: 110 additions & 0 deletions src/core/server/clustering/cluster_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { Logger, LoggerFactory } from '@kbn/logging';
import { ConfigService } from '../config';
import { ClusteringConfigType, config as clusteringConfig } from './clustering_config';
import { TransferBroadcastMessage } from './types';
import { isBroadcastMessage } from './utils';

/**
* Coordinator-side clustering service
*/
export class ClusterManager {
private config?: ClusteringConfigType;
private readonly logger: Logger;

constructor(private readonly configService: ConfigService, logger: LoggerFactory) {
this.logger = logger.get('cluster-manager');
}

public async setup() {
this.config = this.configService.atPathSync<ClusteringConfigType>(clusteringConfig.path);
if (this.config.enabled && cluster.isMaster) {
this.forkWorkers();
}
}

public async stopWorkers() {
try {
await shutdownWorkers();
} catch (e) {
await killWorkers('SIGTERM');
await killWorkers('SIGKILL');
}
}

public broadcast(message: TransferBroadcastMessage, sender?: number) {
const sendToSelf = message.options?.sendToSelf ?? false;
Object.values(cluster.workers).forEach((worker) => {
if (sendToSelf || worker?.id !== sender) {
worker?.send(message);
}
});
}

private forkWorkers() {
const handleWorkerMessage = (workerId: number, message: any) => {
if (isBroadcastMessage(message)) {
this.broadcast(message, workerId);
}
};

const createWorker = () => {
const worker = cluster.fork({});
worker.on('message', (message: any) => {
handleWorkerMessage(worker.id, message);
});
};

cluster.on('online', (worker) => {
this.logger.info(`*** Worker online: ${worker.id}`);
});

cluster.on('exit', (worker, code) => {
if (worker.exitedAfterDisconnect || code === 0) {
// shutting down
} else if (true /* closing */) {
// died while closing
} else {
// died needs restart:
createWorker();
}
});

for (let i = 0; i < this.config!.workers; i++) {
createWorker();
}
}
}

async function shutdownWorkers() {
const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead());
return Promise.all(
workers.map((worker) => {
return new Promise((resolve, reject) => {
worker!.once('exit', (code) => code !== 0 && reject());
worker!.once('disconnect', () => resolve(void 0));
worker!.send({ type: 'shutdown-worker' });
});
})
);
}

async function killWorkers(signal: 'SIGTERM' | 'SIGKILL') {
const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead());
return Promise.all(
workers.map((worker) => {
return new Promise((resolve) => {
worker!.once('exit', () => resolve(void 0));
worker!.process.kill(signal);
});
})
);
}
19 changes: 19 additions & 0 deletions src/core/server/clustering/clustering_config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { schema, TypeOf } from '@kbn/config-schema';

export const config = {
path: 'clustering',
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
workers: schema.number({ defaultValue: 2 }),
}),
};

export type ClusteringConfigType = TypeOf<typeof config.schema>;
93 changes: 93 additions & 0 deletions src/core/server/clustering/clustering_service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { take } from 'rxjs/operators';
import { Logger } from '@kbn/logging';
import { IConfigService } from '@kbn/config';
import { CoreContext } from '../core_context';
import { config as clusteringConfig, ClusteringConfigType } from './clustering_config';
import {
ClusterMessagePayload,
BroadcastOptions,
TransferBroadcastMessage,
MessageHandler,
MessageHandlerUnsubscribeFn,
} from './types';
import { isBroadcastMessage } from './utils';

/**
* @public
*/
export interface ClusteringServiceSetup {
isEnabled: () => boolean;
getWorkerId: () => number;
broadcast: (type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) => void;
addMessageHandler: (type: string, handler: MessageHandler) => MessageHandlerUnsubscribeFn;
// TODO: isMainWorker
}

/**
* Worker-side clustering service
*/
export class ClusteringService {
private readonly log: Logger;
private readonly configService: IConfigService;
private readonly messageHandlers = new Map<string, MessageHandler[]>();

constructor(coreContext: CoreContext) {
this.log = coreContext.logger.get('clustering');
this.configService = coreContext.configService;
}

public async setup(): Promise<ClusteringServiceSetup> {
const config = await this.configService
.atPath<ClusteringConfigType>(clusteringConfig.path)
.pipe(take(1))
.toPromise();
const enabled = config.enabled && cluster.isWorker;

process.on('message', (message) => {
if (isBroadcastMessage(message)) {
this.handleMessage(message);
}
});

return {
isEnabled: () => enabled,
getWorkerId: () => (enabled ? cluster.worker.id : -1),
broadcast: (type, payload, options) => this.broadcast(type, payload, options),
addMessageHandler: (type, handler) => {
this.messageHandlers.set(type, [...(this.messageHandlers.get(type) || []), handler]);
return () => {
this.messageHandlers.set(
type,
this.messageHandlers.get(type)!.filter((h) => h !== handler)
);
};
},
};
}

private handleMessage(message: TransferBroadcastMessage) {
this.log.debug(`Received message of type ${message.type}`);
const handlers = this.messageHandlers.get(message.type) || [];
handlers.forEach((handler) => {
handler(message.payload);
});
}

private broadcast(type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) {
process.send!({
_kind: 'kibana-broadcast',
type,
payload,
options,
});
}
}
46 changes: 46 additions & 0 deletions src/core/server/clustering/get_clustering_info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import cluster from 'cluster';
import { RawConfigService } from '../config';
import { loadClusteringConfig } from './load_config';

/**
* @internal
*/
export interface ClusteringInfo {
/** is clustering enabled */
isEnabled: boolean;
/**
* Is the current process the master process.
* Will be true either when clustering is disabled or when the process is the coordinator
*/
isMaster: boolean;
/**
* Is the current process the coordinator process
* (master process when clustering is enabled)
*/
isCoordinator: boolean;
/** */
isWorker: boolean;
/** */
workerId: number;
}

export const getClusteringInfo = async (
rawConfigService: RawConfigService
): Promise<ClusteringInfo> => {
const config = await loadClusteringConfig(rawConfigService);
return {
isEnabled: config.enabled,
isMaster: cluster.isMaster,
isCoordinator: config.enabled && cluster.isMaster,
isWorker: cluster.isWorker,
workerId: cluster.isWorker ? cluster.worker.id : -1,
};
};
12 changes: 12 additions & 0 deletions src/core/server/clustering/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { getClusteringInfo, ClusteringInfo } from './get_clustering_info';
export { ClusteringService } from './clustering_service';
export { ClusterManager } from './cluster_manager';
export { config as clusteringConfig } from './clustering_config';
Loading

0 comments on commit d0b619d

Please sign in to comment.