Skip to content

Commit

Permalink
process: add onClose event
Browse files Browse the repository at this point in the history
Add a `onClose` convenience event on processes, fired when all the
streams are supposed to be closed.

Signed-off-by: Paul Maréchal <paul.marechal@ericsson.com>
  • Loading branch information
paul-marechal committed Nov 21, 2019
1 parent 0860ab1 commit 6404e6b
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 20 deletions.
2 changes: 1 addition & 1 deletion dev-packages/electron/scripts/post-install.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async function fork(script, args = [], options = {}, callback) {
return new Promise((resolve, reject) => {
const subprocess = cp.fork(script, args, options);
subprocess.once('error', reject);
subprocess.once('exit', (code, signal) => {
subprocess.once('close', (code, signal) => {
if (signal || code) reject(new Error(`"${script}" exited with ${signal || code}`));
else resolve();
});
Expand Down
13 changes: 13 additions & 0 deletions packages/process/src/node/dev-null-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ import stream = require('stream');
* Writing goes to a black hole, reading returns `EOF`.
*/
export class DevNullStream extends stream.Duplex {

constructor(options: {
/**
* Makes this stream call `destroy` on itself, emitting the `close` event.
*/
autoDestroy?: boolean,
} = {}) {
super();
if (options.autoDestroy) {
this.destroy();
}
}

// tslint:disable-next-line:no-any
_write(chunk: any, encoding: string, callback: (err?: Error) => void): void {
callback();
Expand Down
24 changes: 19 additions & 5 deletions packages/process/src/node/multi-ring-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ export class MultiRingBufferReadableStream extends stream.Readable implements Di
this.deq(size);
}

_destroy(err: Error | undefined, callback: (err?: Error) => void): void {
this.ringBuffer.closeStream(this);
this.ringBuffer.closeReader(this.reader);
this.disposed = true;
this.removeAllListeners();
callback(err);
}

onData(): void {
if (this.more === true) {
this.deq(-1);
Expand All @@ -66,10 +74,7 @@ export class MultiRingBufferReadableStream extends stream.Readable implements Di
}

dispose(): void {
this.ringBuffer.closeStream(this);
this.ringBuffer.closeReader(this.reader);
this.disposed = true;
this.removeAllListeners();
this.destroy();
}
}

Expand All @@ -82,7 +87,7 @@ export interface MultiRingBufferOptions {
export interface WrappedPosition { newPos: number, wrap: boolean }

@injectable()
export class MultiRingBuffer {
export class MultiRingBuffer implements Disposable {

protected readonly buffer: Buffer;
protected head: number = -1;
Expand Down Expand Up @@ -276,6 +281,15 @@ export class MultiRingBuffer {
return this.readers.size;
}

/**
* Dispose all the attached readers/streams.
*/
dispose(): void {
for (const astream of this.streams.keys()) {
astream.dispose();
}
}

/* Position should be incremented if it goes pass end. */
protected shouldIncPos(pos: number, end: number, size: number): boolean {
const { newPos: newHead, wrap } = this.inc(end, size);
Expand Down
19 changes: 19 additions & 0 deletions packages/process/src/node/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export abstract class Process {
readonly id: number;
protected readonly startEmitter: Emitter<IProcessStartEvent> = new Emitter<IProcessStartEvent>();
protected readonly exitEmitter: Emitter<IProcessExitEvent> = new Emitter<IProcessExitEvent>();
protected readonly closeEmitter: Emitter<IProcessExitEvent> = new Emitter<IProcessExitEvent>();
protected readonly errorEmitter: Emitter<ProcessErrorEvent> = new Emitter<ProcessErrorEvent>();
protected _killed = false;

Expand Down Expand Up @@ -128,6 +129,9 @@ export abstract class Process {
return this.startEmitter.event;
}

/**
* Wait for the process to exit, streams can still emit data.
*/
get onExit(): Event<IProcessExitEvent> {
return this.exitEmitter.event;
}
Expand All @@ -136,6 +140,13 @@ export abstract class Process {
return this.errorEmitter.event;
}

/**
* Waits for both process exit and for all the streams to be closed.
*/
get onClose(): Event<IProcessExitEvent> {
return this.closeEmitter.event;
}

protected emitOnStarted(): void {
this.startEmitter.fire({});
}
Expand All @@ -150,6 +161,14 @@ export abstract class Process {
this.exitEmitter.fire(exitEvent);
}

/**
* Emit the onClose event for this process. Only one of code and signal
* should be defined.
*/
protected emitOnClose(code?: number, signal?: string): void {
this.closeEmitter.fire({ code, signal });
}

protected handleOnExit(event: IProcessExitEvent): void {
this._killed = true;
const signalSuffix = event.signal ? `, signal: ${event.signal}` : '';
Expand Down
29 changes: 20 additions & 9 deletions packages/process/src/node/raw-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,39 @@ export class RawProcess extends Process {
error.code = error.code || 'Unknown error';
this.emitOnError(error as ProcessErrorEvent);
});
this.process.on('exit', (exitCode: number, signal: string) => {

// When no stdio option is passed, it is null by default.
this.outputStream = this.process.stdout || new DevNullStream({ autoDestroy: true });
this.inputStream = this.process.stdin || new DevNullStream({ autoDestroy: true });
this.errorStream = this.process.stderr || new DevNullStream({ autoDestroy: true });

this.process.on('exit', (exitCode, signal) => {
// node's child_process exit sets the unused parameter to null,
// but we want it to be undefined instead.
this.emitOnExit(
exitCode !== null ? exitCode : undefined,
signal !== null ? signal : undefined,
typeof exitCode === 'number' ? exitCode : undefined,
typeof signal === 'string' ? signal : undefined,
);
});

this.outputStream = this.process.stdout || new DevNullStream();
this.inputStream = this.process.stdin || new DevNullStream();
this.errorStream = this.process.stderr || new DevNullStream();
this.process.on('close', (exitCode, signal) => {
// node's child_process exit sets the unused parameter to null,
// but we want it to be undefined instead.
this.emitOnClose(
typeof exitCode === 'number' ? exitCode : undefined,
typeof signal === 'string' ? signal : undefined,
);
});

if (this.process.pid !== undefined) {
process.nextTick(this.emitOnStarted.bind(this));
}
} catch (error) {
/* When an error is thrown, set up some fake streams, so the client
code doesn't break because these field are undefined. */
this.outputStream = new DevNullStream();
this.inputStream = new DevNullStream();
this.errorStream = new DevNullStream();
this.outputStream = new DevNullStream({ autoDestroy: true });
this.inputStream = new DevNullStream({ autoDestroy: true });
this.errorStream = new DevNullStream({ autoDestroy: true });

/* Call the client error handler, but first give them a chance to register it. */
this.emitOnErrorAsync(error);
Expand Down
31 changes: 28 additions & 3 deletions packages/process/src/node/terminal-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ export class TerminalProcess extends Process {
protected readonly terminal: IPty | undefined;

readonly outputStream = this.createOutputStream();
readonly errorStream = new DevNullStream();
readonly errorStream = new DevNullStream({ autoDestroy: true });
readonly inputStream: Writable;

constructor(
Expand Down Expand Up @@ -210,7 +210,19 @@ export class TerminalProcess extends Process {
}
});

this.terminal.on('exit', (code: number, signal?: number) => {
// `node-pty` actually emits a `close` event when the underlying socket gets closed:
const onClose = new Promise<void>(resolve => {
// tslint:disable-next-line: no-any
this.terminal!.on('close' as any, () => resolve());
}).then(() => {
this.ringBuffer.dispose();
});
const onExit = new Promise<{ code: number, signal?: number }>(resolve => {
this.terminal!.on('exit', (code, signal) => resolve({ code, signal }));
});

onExit.then(exitStatus => {
const { code, signal } = exitStatus;
// Make sure to only pass either code or signal as !undefined, not
// both.
//
Expand All @@ -226,6 +238,19 @@ export class TerminalProcess extends Process {
}
});

/**
* Wait for all the streams to be closed, as well as for the process
* to exit.
*/
Promise.all([onExit, onClose]).then(resolved => {
const { code, signal } = resolved[0];
if (signal === undefined || signal === 0) {
this.emitOnClose(code, undefined);
} else {
this.emitOnClose(undefined, signame(signal));
}
});

this.terminal.on('data', (data: string) => {
ringBuffer.enq(data);
});
Expand All @@ -237,7 +262,7 @@ export class TerminalProcess extends Process {
});

} catch (error) {
this.inputStream = new DevNullStream();
this.inputStream = new DevNullStream({ autoDestroy: true });

// Normalize the error to make it as close as possible as what
// node's child_process.spawn would generate in the same
Expand Down
4 changes: 2 additions & 2 deletions packages/task/src/node/process/process-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class ProcessTask extends Task {
) {
super(taskManager, logger, options);

const toDispose = this.process.onExit(async event => {
const toDispose = this.process.onClose(async event => {
toDispose.dispose();
this.fireTaskExited(await this.getTaskExitedEvent(event));
});
Expand Down Expand Up @@ -100,7 +100,7 @@ export class ProcessTask extends Task {
if (this.process.killed) {
resolve();
} else {
const toDispose = this.process.onExit(event => {
const toDispose = this.process.onClose(event => {
toDispose.dispose();
resolve();
});
Expand Down

0 comments on commit 6404e6b

Please sign in to comment.