Skip to content

Commit

Permalink
reload config during non-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
jahudka committed Aug 9, 2023
1 parent 1cfb6d8 commit 8fdf082
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 40 deletions.
6 changes: 6 additions & 0 deletions docs/user/04-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ workers with new ones. In other words, you can simply slap the `--upgrade` flag
onto the `nodesockd restart` command in your CI pipeline (yes, you can combine
it with `--suspend`) and you should be good.

If a `nodesockd restart --upgrade` doesn't result in a daemon restart, the
existing daemon will reload its configuration prior to restarting workers, so
that the observed behaviour is the same regardless of whether the daemon is
upgraded (because if the daemon _is_ upgraded, the new process _must_ load
the configuration files from disk).

Next chapter: [The Nodesockd CLI][4]


Expand Down
9 changes: 7 additions & 2 deletions docs/user/05-cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ document, the path to the `nodesockd` executable is left out for brevity.
daemon was started, and then `extends` options will be resolved as usual,
meaning that the final list of loaded config files may change.

The daemon will apply the new configuration immediately upon issuing this
command. It will try to figure out the smallest set of actions needed to make
the actual state match the configuration, but some changes _will_ lead to
workers being restarted. As usual, this should happen with zero downtime.

**Options:**

- ( `--config=<path>` _or_ `-c <path>` ) _or_ ( `--ipc=<path>` _or_ `-i <path>` )
Expand Down Expand Up @@ -181,8 +186,8 @@ document, the path to the `nodesockd` executable is left out for brevity.

- ### `nodesockd send-req <request> [data]`
This command will send the specified `<request>`, optionally along with any
JSON-serialized `[data]`, to one or more workers, and display the result from
each worker. See the [Messaging][4] section of the Integration page for
JSON-serialized `[data]`, to one or more workers, and display the response(s)
from each worker. See the [Messaging][4] section of the Integration page for
details on how to specify which workers should receive the request and how to
handle requests in workers.

Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export const restart = createCommand(

console.log('Waiting for a new daemon process to come online...');
client = await createClient(ipc, config, true);
await sleep(2000); // ensure adoption period has passed
await sleep(2500); // ensure adoption period has passed
} else {
console.log(`Daemon process with PID ${pid} is already the latest version.`);
return;
Expand Down
39 changes: 29 additions & 10 deletions src/daemon/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { ConsoleHandler } from '@debugr/console';
import { Logger, LogLevel } from '@debugr/core';
import { ZodError } from 'zod';
import {
Config, DaemonConfig,
Config,
DaemonConfig,
DaemonStatus,
getNodesockdVersion,
loadConfig, WorkerRestartReply,
loadConfig,
WorkerRestartReply,
} from '../common';
import { IpcPeer, UnixSocketIpcServer, UnixSocketIpcTransport } from '../ipc';
import { PromiseAborted, sleep } from '../utils';
import { DevServer } from './devServer';
import { ProcessManager } from './processManager';
import { ApplyConfigCb, ProcessManager } from './processManager';
import { DaemonIpcIncomingMap } from './types';
import { lte as isCurrentVersion } from 'semver';

Expand Down Expand Up @@ -99,21 +101,32 @@ export class Daemon {
};
}

async reloadConfig(catchErrors: boolean = false): Promise<void> {
async reloadConfig(catchErrors: boolean = false, cb?: ApplyConfigCb): Promise<void> {
this.logger.info('Reloading daemon config...');

try {
[this.config, this.configFiles] = await loadConfig('/', this.configFiles[0]);
const [config, configFiles] = await loadConfig('/', this.configFiles[0]);

await this.pm.setConfig(config, cb);

[this.config, this.configFiles] = [config, configFiles];
process.title = `${this.config.name} daemon`;
await this.pm.setConfig(this.config);

this.logger.info('Daemon config reloaded successfully');
} catch (e) {
if (!catchErrors || !(e instanceof ZodError)) {
if (!catchErrors) {
throw e;
}

const msg = e.errors.length > 1 ? `\n - ${e.errors.join('\n - ')}` : ` ${e.errors.join('')}`;
this.logger.error(`Failed to reload config file:${msg}`);
if (!(e instanceof ZodError) || e.errors.length < 2) {
this.logger.error(`Failed to reload config file: ${e.message}`);
} else {
this.logger.error('Failed to reload config file');

for (const err of e.errors) {
this.logger.error(`${err.path}: ${err.message}`);
}
}
}
}

Expand All @@ -135,7 +148,13 @@ export class Daemon {

private async handleRestart(suspended?: boolean, maxAttempts?: number, version?: string): Promise<WorkerRestartReply> {
if (version === undefined || isCurrentVersion(version, this.version)) {
await this.pm.restart(suspended, maxAttempts);
if (version !== undefined) {
// upgrade requested, but already on latest version -> make sure we reload configuration:
await this.reloadConfig(false, async () => this.pm.restart(suspended, maxAttempts));
} else {
await this.pm.restart(suspended, maxAttempts);
}

return { pid: process.pid };
} else {
this.logger.warning(`Upgrading daemon to version ${version}...`);
Expand Down
78 changes: 51 additions & 27 deletions src/daemon/processManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
consumeAsyncResources,
EventEmitter,
EventMap,
ParallelTask, ParallelTaskFailed,
ParallelTask,
ParallelTaskFailed,
ParallelTaskGroup,
PromiseTimedOut,
shortId,
Expand All @@ -33,6 +34,8 @@ export interface ProcessManagerEvents extends EventMap {
offline: [socketPath: string];
}

export type ApplyConfigCb = (config: Config, orig: Config) => Promise<void>;

export class ProcessManager extends EventEmitter<ProcessManagerEvents> {
private readonly logger: Logger;
private config: Config;
Expand All @@ -51,10 +54,14 @@ export class ProcessManager extends EventEmitter<ProcessManagerEvents> {

async run(): Promise<void> {
this.running = true;
await sleep(2000);
await this.setWorkerCount(this.config.workers);
await this.setStandbyCount(this.config.standby);
await sleep(2000); // allow adopting previous daemon's workers
this.adopting = false;

if (!this.workers.size && !this.workers.standbys) {
// no workers were adopted, meaning we're not inside an upgrade -> lets start our own workers:
this.logger.info('Starting workers...');
await this.startWorkers();
}
}

async start(suspended?: boolean, maxAttempts?: number): Promise<void> {
Expand Down Expand Up @@ -132,29 +139,15 @@ export class ProcessManager extends EventEmitter<ProcessManagerEvents> {
worker.handleBroken(reason);
}

async setConfig(config: Config): Promise<void> {
const actions = compareConfig(this.config, config);
async setConfig(config: Config, cb?: ApplyConfigCb): Promise<void> {
const orig = this.config;
this.config = config;

for (const action of actions) {
switch (action) {
case 'restart':
await this.restart();
break;
case 'set-name':
await Promise.all(this.workers.mapAll(async (worker) => {
if (worker.isInState('running', 'online', 'suspended')) {
await worker.setName(this.config.name);
}
}));
break;
case 'set-workers':
await this.setWorkerCount(this.config.workers);
break;
case 'set-standby':
await this.setStandbyCount(this.config.standby);
break;
}
try {
await (cb ? cb(config, orig) : this.applyConfig(config, orig));
} catch (e) {
this.config = orig;
throw e;
}
}

Expand Down Expand Up @@ -202,14 +195,20 @@ export class ProcessManager extends EventEmitter<ProcessManagerEvents> {
queue.push(this.startWorker(i, suspended, maxAttempts, group.createTask()));
}

const standbys = this.workers.ejectStandby();
const obsolete = [
...this.workers.ejectStandby(),
...this.workers.resolve(`${this.config.workers}-`).map((worker) => {
this.workers.demote(worker);
return worker;
}),
];

for (let i = 0; i < this.config.standby; ++i) {
queue.push(this.startWorker(true, suspended, maxAttempts, group.createTask()));
}

queue.length && await Promise.all(queue);
standbys.length && await Promise.all(standbys.map((standby) => standby.terminate()));
obsolete.length && await Promise.all(obsolete.map((worker) => worker.terminate()));
}

private async startWorker(idxOrStandby: number | true, suspended?: boolean, maxAttempts?: number, task?: ParallelTask): Promise<void> {
Expand Down Expand Up @@ -300,6 +299,31 @@ export class ProcessManager extends EventEmitter<ProcessManagerEvents> {
queue.length && await Promise.all(queue);
}

private async applyConfig(config: Config, orig: Config): Promise<void> {
const actions = compareConfig(orig, config);

for (const action of actions) {
switch (action) {
case 'restart':
await this.restart();
break;
case 'set-name':
await Promise.all(this.workers.mapAll(async (worker) => {
if (worker.isInState('running', 'online', 'suspended')) {
await worker.setName(this.config.name);
}
}));
break;
case 'set-workers':
await this.setWorkerCount(this.config.workers);
break;
case 'set-standby':
await this.setStandbyCount(this.config.standby);
break;
}
}
}

private async handleWorkerBroken(worker: AbstractWorkerProcess): Promise<void> {
if (this.running) {
if (this.workers.isCurrent(worker)) {
Expand Down

0 comments on commit 8fdf082

Please sign in to comment.