diff --git a/src/core/server/bootstrap.ts b/src/core/server/bootstrap.ts index 42f6d9aedf1d6..093eaf9208234 100644 --- a/src/core/server/bootstrap.ts +++ b/src/core/server/bootstrap.ts @@ -8,8 +8,11 @@ import chalk from 'chalk'; import { CliArgs, Env, RawConfigService } from './config'; -import { Root } from './root'; import { CriticalError } from './errors'; +import { getClusteringInfo } from './clustering'; +import { KibanaCoordinator } from './root/coordinator'; +import { KibanaWorker } from './root/worker'; +import { KibanaRoot } from './root/types'; interface KibanaFeatures { // Indicates whether we can run Kibana in dev mode in which Kibana is run as @@ -49,46 +52,43 @@ export async function bootstrap({ // eslint-disable-next-line @typescript-eslint/no-var-requires const { REPO_ROOT } = require('@kbn/utils'); + const rawConfigService = new RawConfigService(configs, applyConfigOverrides); + rawConfigService.loadConfig(); + + const clusterInfo = await getClusteringInfo(rawConfigService); + const isDevCliParent = + cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild; + const env = Env.createDefault(REPO_ROOT, { + // TODO: do we want to add clusterInfo to Env ? configs, cliArgs, - isDevCliParent: cliArgs.dev && features.isCliDevModeSupported && !process.env.isDevCliChild, + isDevCliParent, }); - const rawConfigService = new RawConfigService(env.configs, applyConfigOverrides); - rawConfigService.loadConfig(); - - const root = new Root(rawConfigService, env, onRootShutdown); - - process.on('SIGHUP', () => reloadLoggingConfig()); - - // This is only used by the LogRotator service - // in order to be able to reload the log configuration - // under the cluster mode - process.on('message', (msg) => { - if (!msg || msg.reloadLoggingConfig !== true) { - return; - } - - reloadLoggingConfig(); - }); + let root: KibanaRoot; + if (clusterInfo.isCoordinator && !isDevCliParent) { + root = new KibanaCoordinator(rawConfigService, env, clusterInfo, onRootShutdown); + } else { + root = new KibanaWorker(rawConfigService, env, clusterInfo, onRootShutdown); + } - function reloadLoggingConfig() { - const cliLogger = root.logger.get('cli'); - cliLogger.info('Reloading logging configuration due to SIGHUP.', { tags: ['config'] }); + if (clusterInfo.isMaster) { + process.on('SIGHUP', () => root.reloadLoggingConfig()); - try { - rawConfigService.reloadConfig(); - } catch (err) { - return shutdown(err); - } + // This is only used by the legacy LogRotator service + // in order to be able to reload the log configuration + // under the cluster mode + process.on('message', (msg) => { + if (msg?.reloadLoggingConfig === true) { + root.reloadLoggingConfig(); + } + }); - cliLogger.info('Reloaded logging configuration due to SIGHUP.', { tags: ['config'] }); + process.on('SIGINT', () => root.shutdown()); + process.on('SIGTERM', () => root.shutdown()); } - process.on('SIGINT', () => shutdown()); - process.on('SIGTERM', () => shutdown()); - function shutdown(reason?: Error) { rawConfigService.stop(); return root.shutdown(reason); @@ -109,9 +109,7 @@ function onRootShutdown(reason?: any) { // mirror such fatal errors in standard output with `console.error`. // eslint-disable-next-line console.error(`\n${chalk.white.bgRed(' FATAL ')} ${reason}\n`); - process.exit(reason instanceof CriticalError ? reason.processExitCode : 1); } - process.exit(0); } diff --git a/src/core/server/clustering/cluster_manager.ts b/src/core/server/clustering/cluster_manager.ts new file mode 100644 index 0000000000000..f1143b379b747 --- /dev/null +++ b/src/core/server/clustering/cluster_manager.ts @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import cluster from 'cluster'; +import { Logger, LoggerFactory } from '@kbn/logging'; +import { ConfigService } from '../config'; +import { ClusteringConfigType, config as clusteringConfig } from './clustering_config'; +import { TransferBroadcastMessage } from './types'; +import { isBroadcastMessage } from './utils'; + +/** + * Coordinator-side clustering service + */ +export class ClusterManager { + private config?: ClusteringConfigType; + private readonly logger: Logger; + + constructor(private readonly configService: ConfigService, logger: LoggerFactory) { + this.logger = logger.get('cluster-manager'); + } + + public async setup() { + this.config = this.configService.atPathSync(clusteringConfig.path); + if (this.config.enabled && cluster.isMaster) { + this.forkWorkers(); + } + } + + public async stopWorkers() { + try { + await shutdownWorkers(); + } catch (e) { + await killWorkers('SIGTERM'); + await killWorkers('SIGKILL'); + } + } + + public broadcast(message: TransferBroadcastMessage, sender?: number) { + const sendToSelf = message.options?.sendToSelf ?? false; + Object.values(cluster.workers).forEach((worker) => { + if (sendToSelf || worker?.id !== sender) { + worker?.send(message); + } + }); + } + + private forkWorkers() { + const handleWorkerMessage = (workerId: number, message: any) => { + if (isBroadcastMessage(message)) { + this.broadcast(message, workerId); + } + }; + + const createWorker = () => { + const worker = cluster.fork({}); + worker.on('message', (message: any) => { + handleWorkerMessage(worker.id, message); + }); + }; + + cluster.on('online', (worker) => { + this.logger.info(`*** Worker online: ${worker.id}`); + }); + + cluster.on('exit', (worker, code) => { + if (worker.exitedAfterDisconnect || code === 0) { + // shutting down + } else if (true /* closing */) { + // died while closing + } else { + // died needs restart: + createWorker(); + } + }); + + for (let i = 0; i < this.config!.workers; i++) { + createWorker(); + } + } +} + +async function shutdownWorkers() { + const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead()); + return Promise.all( + workers.map((worker) => { + return new Promise((resolve, reject) => { + worker!.once('exit', (code) => code !== 0 && reject()); + worker!.once('disconnect', () => resolve(void 0)); + worker!.send({ type: 'shutdown-worker' }); + }); + }) + ); +} + +async function killWorkers(signal: 'SIGTERM' | 'SIGKILL') { + const workers = Object.values(cluster.workers).filter((worker) => !worker!.isDead()); + return Promise.all( + workers.map((worker) => { + return new Promise((resolve) => { + worker!.once('exit', () => resolve(void 0)); + worker!.process.kill(signal); + }); + }) + ); +} diff --git a/src/core/server/clustering/clustering_config.ts b/src/core/server/clustering/clustering_config.ts new file mode 100644 index 0000000000000..d332f38622b89 --- /dev/null +++ b/src/core/server/clustering/clustering_config.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const config = { + path: 'clustering', + schema: schema.object({ + enabled: schema.boolean({ defaultValue: true }), + workers: schema.number({ defaultValue: 2 }), + }), +}; + +export type ClusteringConfigType = TypeOf; diff --git a/src/core/server/clustering/clustering_service.ts b/src/core/server/clustering/clustering_service.ts new file mode 100644 index 0000000000000..563cd5c9f97d9 --- /dev/null +++ b/src/core/server/clustering/clustering_service.ts @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import cluster from 'cluster'; +import { take } from 'rxjs/operators'; +import { Logger } from '@kbn/logging'; +import { IConfigService } from '@kbn/config'; +import { CoreContext } from '../core_context'; +import { config as clusteringConfig, ClusteringConfigType } from './clustering_config'; +import { + ClusterMessagePayload, + BroadcastOptions, + TransferBroadcastMessage, + MessageHandler, + MessageHandlerUnsubscribeFn, +} from './types'; +import { isBroadcastMessage } from './utils'; + +/** + * @public + */ +export interface ClusteringServiceSetup { + isEnabled: () => boolean; + getWorkerId: () => number; + broadcast: (type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) => void; + addMessageHandler: (type: string, handler: MessageHandler) => MessageHandlerUnsubscribeFn; + // TODO: isMainWorker +} + +/** + * Worker-side clustering service + */ +export class ClusteringService { + private readonly log: Logger; + private readonly configService: IConfigService; + private readonly messageHandlers = new Map(); + + constructor(coreContext: CoreContext) { + this.log = coreContext.logger.get('clustering'); + this.configService = coreContext.configService; + } + + public async setup(): Promise { + const config = await this.configService + .atPath(clusteringConfig.path) + .pipe(take(1)) + .toPromise(); + const enabled = config.enabled && cluster.isWorker; + + process.on('message', (message) => { + if (isBroadcastMessage(message)) { + this.handleMessage(message); + } + }); + + return { + isEnabled: () => enabled, + getWorkerId: () => (enabled ? cluster.worker.id : -1), + broadcast: (type, payload, options) => this.broadcast(type, payload, options), + addMessageHandler: (type, handler) => { + this.messageHandlers.set(type, [...(this.messageHandlers.get(type) || []), handler]); + return () => { + this.messageHandlers.set( + type, + this.messageHandlers.get(type)!.filter((h) => h !== handler) + ); + }; + }, + }; + } + + private handleMessage(message: TransferBroadcastMessage) { + this.log.debug(`Received message of type ${message.type}`); + const handlers = this.messageHandlers.get(message.type) || []; + handlers.forEach((handler) => { + handler(message.payload); + }); + } + + private broadcast(type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) { + process.send!({ + _kind: 'kibana-broadcast', + type, + payload, + options, + }); + } +} diff --git a/src/core/server/clustering/get_clustering_info.ts b/src/core/server/clustering/get_clustering_info.ts new file mode 100644 index 0000000000000..773cd6fbc0294 --- /dev/null +++ b/src/core/server/clustering/get_clustering_info.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import cluster from 'cluster'; +import { RawConfigService } from '../config'; +import { loadClusteringConfig } from './load_config'; + +/** + * @internal + */ +export interface ClusteringInfo { + /** is clustering enabled */ + isEnabled: boolean; + /** + * Is the current process the master process. + * Will be true either when clustering is disabled or when the process is the coordinator + */ + isMaster: boolean; + /** + * Is the current process the coordinator process + * (master process when clustering is enabled) + */ + isCoordinator: boolean; + /** */ + isWorker: boolean; + /** */ + workerId: number; +} + +export const getClusteringInfo = async ( + rawConfigService: RawConfigService +): Promise => { + const config = await loadClusteringConfig(rawConfigService); + return { + isEnabled: config.enabled, + isMaster: cluster.isMaster, + isCoordinator: config.enabled && cluster.isMaster, + isWorker: cluster.isWorker, + workerId: cluster.isWorker ? cluster.worker.id : -1, + }; +}; diff --git a/src/core/server/clustering/index.ts b/src/core/server/clustering/index.ts new file mode 100644 index 0000000000000..311d0a95c5b49 --- /dev/null +++ b/src/core/server/clustering/index.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export { getClusteringInfo, ClusteringInfo } from './get_clustering_info'; +export { ClusteringService } from './clustering_service'; +export { ClusterManager } from './cluster_manager'; +export { config as clusteringConfig } from './clustering_config'; diff --git a/src/core/server/clustering/load_config.ts b/src/core/server/clustering/load_config.ts new file mode 100644 index 0000000000000..b65c4a1e31a61 --- /dev/null +++ b/src/core/server/clustering/load_config.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { take } from 'rxjs/operators'; +import { RawConfigService } from '../config'; +import { ClusteringConfigType, config } from './clustering_config'; + +export const loadClusteringConfig = async ( + rawConfigService: RawConfigService +): Promise => { + const rawConfig = await rawConfigService.getConfig$().pipe(take(1)).toPromise(); + return config.schema.validate(rawConfig[config.path]); +}; diff --git a/src/core/server/clustering/types.ts b/src/core/server/clustering/types.ts new file mode 100644 index 0000000000000..255ea49a7fdb6 --- /dev/null +++ b/src/core/server/clustering/types.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Serializable } from '../../types'; + +/** @public */ +export type ClusterMessagePayload = Serializable; + +export interface BroadcastOptions { + /** + * If true, will also send the broadcast message to the worker that sent it. + * Defaults to false. + */ + sendToSelf?: boolean; +} +/** @internal */ +export interface TransferBroadcastMessage { + _kind: 'kibana-broadcast'; + type: string; + payload?: ClusterMessagePayload; + options?: BroadcastOptions; +} + +/** @public */ +export type MessageHandler = (payload: ClusterMessagePayload | undefined) => void; + +/** @public */ +export type MessageHandlerUnsubscribeFn = () => void; diff --git a/src/core/server/clustering/utils.ts b/src/core/server/clustering/utils.ts new file mode 100644 index 0000000000000..a27a3f535cb90 --- /dev/null +++ b/src/core/server/clustering/utils.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { TransferBroadcastMessage } from './types'; + +export const isBroadcastMessage = (message: any): message is TransferBroadcastMessage => { + return message._kind === 'kibana-broadcast'; +}; diff --git a/src/core/server/root/coordinator.ts b/src/core/server/root/coordinator.ts new file mode 100644 index 0000000000000..2d5ab59be51ce --- /dev/null +++ b/src/core/server/root/coordinator.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Logger, LoggerFactory } from '@kbn/logging'; +import { Subscription } from 'rxjs'; +import { Env, RawConfigurationProvider, ConfigService } from '@kbn/config'; +import { LoggingSystem } from '../logging'; +import { ClusterManager, ClusteringInfo, clusteringConfig } from '../clustering'; +import { KibanaRoot } from './types'; + +/** + * Root class for the Kibana coordinator in clustering mode + */ +export class KibanaCoordinator implements KibanaRoot { + public readonly logger: LoggerFactory; + private readonly log: Logger; + private readonly loggingSystem: LoggingSystem; + private readonly configService: ConfigService; + private clusterManager: ClusterManager; + private loggingConfigSubscription?: Subscription; + + constructor( + rawConfigProvider: RawConfigurationProvider, + private readonly env: Env, + private readonly clusteringInfo: ClusteringInfo, + private readonly onShutdown?: (reason?: Error | string) => void + ) { + this.loggingSystem = new LoggingSystem(); + this.logger = this.loggingSystem.asLoggerFactory(); + this.configService = new ConfigService(rawConfigProvider, env, this.logger); + + this.log = this.logger.get('root'); + this.clusterManager = new ClusterManager(this.configService, this.logger); + } + + async setup() { + this.configService.setSchema(clusteringConfig.path, clusteringConfig.schema); + await this.configService.validate(); + await this.clusterManager.setup(); + } + + async start() { + // nothing for now + } + + async shutdown(reason?: Error) { + if (reason) { + this.log.fatal(reason); + } + + await this.clusterManager.stopWorkers(); + + if (this.onShutdown) { + this.onShutdown(reason); + } + } + + reloadLoggingConfig() { + // TODO: broadcast to all workers + this.clusterManager.broadcast({ + _kind: 'kibana-broadcast', + type: 'reload-logging-config', + }); + } +} diff --git a/src/core/server/root/types.ts b/src/core/server/root/types.ts new file mode 100644 index 0000000000000..bda549c279ff5 --- /dev/null +++ b/src/core/server/root/types.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export interface KibanaRoot { + shutdown(reason?: Error): void; + reloadLoggingConfig(): void; + setup(): Promise; + start(): Promise; +} diff --git a/src/core/server/root/worker.ts b/src/core/server/root/worker.ts new file mode 100644 index 0000000000000..54bdd91ea8757 --- /dev/null +++ b/src/core/server/root/worker.ts @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Logger, LoggerFactory } from '@kbn/logging'; +import { ConnectableObservable, of, Subscription } from 'rxjs'; +import { Env, RawConfigService } from '@kbn/config'; +import { concatMap, first, publishReplay, switchMap, tap } from 'rxjs/operators'; +import { Server } from '../server'; +import { LoggingConfigType, LoggingSystem } from '../logging'; +import { KibanaRoot } from './types'; +import { ClusteringInfo } from '../clustering'; + +/** + * Root class for workers in clustering mode. + * + * A worker is basically just doing the same thing + */ +export class KibanaWorker implements KibanaRoot { + public readonly logger: LoggerFactory; + private readonly log: Logger; + private readonly loggingSystem: LoggingSystem; + private readonly server: Server; + private loggingConfigSubscription?: Subscription; + + constructor( + private readonly rawConfigService: RawConfigService, + private readonly env: Env, + private readonly clusteringInfo: ClusteringInfo, + private readonly onShutdown?: (reason?: Error | string) => void + ) { + this.loggingSystem = new LoggingSystem(); + this.logger = this.loggingSystem.asLoggerFactory(); + this.log = this.logger.get('root'); + this.server = new Server(rawConfigService, env, this.loggingSystem); + } + + public async setup() { + if (this.clusteringInfo.isEnabled && this.clusteringInfo.isWorker) { + process.on('message', (message) => { + if (message?.type === 'reload-logging-config') { + this.reloadLoggingConfig(); + } + if (message?.type === 'shutdown-worker') { + this.shutdown(); + } + }); + } + + try { + this.server.setupCoreConfig(); + await this.setupLogging(); + this.log.debug('setting up root'); + return await this.server.setup(); + } catch (e) { + await this.shutdown(e); + throw e; + } + } + + public async start() { + this.log.debug('starting root'); + try { + return await this.server.start(); + } catch (e) { + await this.shutdown(e); + throw e; + } + } + + public async shutdown(reason?: any) { + this.log.debug('shutting root down'); + + if (reason) { + if (reason.code === 'EADDRINUSE' && Number.isInteger(reason.port)) { + reason = new Error( + `Port ${reason.port} is already in use. Another instance of Kibana may be running!` + ); + } + this.log.fatal(reason); + } + + await this.server.stop(); + + if (this.loggingConfigSubscription !== undefined) { + this.loggingConfigSubscription.unsubscribe(); + this.loggingConfigSubscription = undefined; + } + await this.loggingSystem.stop(); + + if (this.onShutdown !== undefined) { + this.onShutdown(reason); + } + } + + reloadLoggingConfig() { + const cliLogger = this.logger.get('cli'); + cliLogger.info('Reloading logging configuration due to SIGHUP.', { tags: ['config'] }); + + try { + this.rawConfigService.reloadConfig(); + } catch (err) { + return this.shutdown(err); + } + + cliLogger.info('Reloaded logging configuration due to SIGHUP.', { tags: ['config'] }); + } + + private async setupLogging() { + const { configService } = this.server; + // Stream that maps config updates to logger updates, including update failures. + const update$ = configService.getConfig$().pipe( + // always read the logging config when the underlying config object is re-read + // except for the CLI process where we only apply the default logging config once + switchMap(() => + this.env.isDevCliParent ? of(undefined) : configService.atPath('logging') + ), + concatMap((config) => this.loggingSystem.upgrade(config)), + // This specifically console.logs because we were not able to configure the logger. + // eslint-disable-next-line no-console + tap({ error: (err) => console.error('Configuring logger failed:', err) }), + publishReplay(1) + ) as ConnectableObservable; + + // Subscription and wait for the first update to complete and throw if it fails. + const connectSubscription = update$.connect(); + await update$.pipe(first()).toPromise(); + + // Send subsequent update failures to this.shutdown(), stopped via loggingConfigSubscription. + this.loggingConfigSubscription = update$.subscribe({ + error: (err) => this.shutdown(err), + }); + + // Add subscription we got from `connect` so that we can dispose both of them + // at once. We can't inverse this and add consequent updates subscription to + // the one we got from `connect` because in the error case the latter will be + // automatically disposed before the error is forwarded to the former one so + // the shutdown logic won't be called. + this.loggingConfigSubscription.add(connectSubscription); + } +} diff --git a/src/core/server/server.ts b/src/core/server/server.ts index 337dfa8824303..d4e0def6cfc18 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -26,6 +26,7 @@ import { CapabilitiesService } from './capabilities'; import { EnvironmentService, config as pidConfig } from './environment'; // do not try to shorten the import to `./status`, it will break server test mocking import { StatusService } from './status/status_service'; +import { ClusteringService, clusteringConfig } from './clustering'; import { config as cspConfig } from './csp'; import { config as elasticsearchConfig } from './elasticsearch'; @@ -67,6 +68,7 @@ export class Server { private readonly coreApp: CoreApp; private readonly coreUsageData: CoreUsageDataService; private readonly i18n: I18nService; + private readonly clustering: ClusteringService; private readonly savedObjectsStartPromise: Promise; private resolveSavedObjectsStartPromise?: (value: SavedObjectsServiceStart) => void; @@ -86,6 +88,7 @@ export class Server { const core = { coreId, configService: this.configService, env, logger: this.logger }; this.context = new ContextService(core); + this.clustering = new ClusteringService(core); this.http = new HttpService(core); this.rendering = new RenderingService(core); this.plugins = new PluginsService(core); @@ -128,6 +131,8 @@ export class Server { await ensureValidConfiguration(this.configService, legacyConfigSetup); } + const clusteringSetup = await this.clustering.setup(); + const contextServiceSetup = this.context.setup({ // We inject a fake "legacy plugin" with dependencies on every plugin so that legacy plugins: // 1) Can access context from any KP plugin @@ -318,6 +323,7 @@ export class Server { statusConfig, pidConfig, i18nConfig, + clusteringConfig, ]; this.configService.addDeprecationProvider(rootConfigPath, coreDeprecationProvider);