Skip to content

Commit

Permalink
process: split worker IO into internal/worker/io.js
Browse files Browse the repository at this point in the history
- Move `setupProcessStdio` which contains write access to
  the process object into `bootstrap/node.js`
- Move `MessagePort`, `MessageChannel`, `ReadableWorkerStdio`,
  and `WritableWorkerStdio` into `internal/worker/io.js`
- Move more worker-specific bootstrap code into
  `internal/process/worker_thread_only` from `setupChild`
  in `internal/worker.js`, and move the `process._fatalException`
  overwrite into `bootstrap/node.js` for clarity.

PR-URL: #25199
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
joyeecheung committed Dec 31, 2018
1 parent 7163fbf commit af23715
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 299 deletions.
43 changes: 39 additions & 4 deletions lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ function startup() {
}

if (isMainThread) {
mainThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
NativeModule.require('internal/process/stdio').getMainThreadStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
} else {
workerThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
workerThreadSetup.initializeWorkerStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
}

if (global.__coverage__)
Expand Down Expand Up @@ -312,8 +316,14 @@ function startup() {
function startExecution() {
// This means we are in a Worker context, and any script execution
// will be directed by the worker module.
if (internalBinding('worker').getEnvMessagePort() !== undefined) {
NativeModule.require('internal/worker').setupChild();
if (!isMainThread) {
const workerThreadSetup = NativeModule.require(
'internal/process/worker_thread_only'
);
// Set up the message port and start listening
const { workerFatalExeception } = workerThreadSetup.setup();
// Overwrite fatalException
process._fatalException = workerFatalExeception;
return;
}

Expand Down Expand Up @@ -505,6 +515,31 @@ function setupProcessObject() {
EventEmitter.call(process);
}

function setupProcessStdio(getStdout, getStdin, getStderr) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});

Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});

Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});

process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}

function setupGlobalVariables() {
Object.defineProperty(global, Symbol.toStringTag, {
value: 'global',
Expand Down
10 changes: 0 additions & 10 deletions lib/internal/process/main_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ const {
validateString
} = require('internal/validators');

const {
setupProcessStdio,
getMainThreadStdio
} = require('internal/process/stdio');

function setupStdio() {
setupProcessStdio(getMainThreadStdio());
}

// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function chdir(directory) {
Expand Down Expand Up @@ -174,7 +165,6 @@ function setupChildProcessIpcChannel() {
}

module.exports = {
setupStdio,
wrapProcessMethods,
setupSignalHandlers,
setupChildProcessIpcChannel,
Expand Down
26 changes: 0 additions & 26 deletions lib/internal/process/stdio.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

exports.setupProcessStdio = setupProcessStdio;
exports.getMainThreadStdio = getMainThreadStdio;

function dummyDestroy(err, cb) { cb(err); }
Expand Down Expand Up @@ -134,31 +133,6 @@ function getMainThreadStdio() {
};
}

function setupProcessStdio({ getStdout, getStdin, getStderr }) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});

Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});

Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});

process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}

function createWritableStdioStream(fd) {
var stream;
const tty_wrap = internalBinding('tty_wrap');
Expand Down
51 changes: 41 additions & 10 deletions lib/internal/process/worker_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,54 @@

// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort,
threadId
} = internalBinding('worker');

const debug = require('util').debuglog('worker');

const {
setupProcessStdio
} = require('internal/process/stdio');
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
} = require('internal/worker/io');

const {
workerStdio
createMessageHandler,
createWorkerFatalExeception
} = require('internal/worker');

function setupStdio() {
setupProcessStdio({
getStdout: () => workerStdio.stdout,
getStderr: () => workerStdio.stderr,
getStdin: () => workerStdio.stdin
});
const workerStdio = {};

function initializeWorkerStdio() {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');

return {
getStdout() { return workerStdio.stdout; },
getStderr() { return workerStdio.stderr; },
getStdin() { return workerStdio.stdin; }
};
}

function setup() {
debug(`[${threadId}] is setting up worker child environment`);

const port = getEnvMessagePort();
const publicWorker = require('worker_threads');
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
port.start();

return {
workerFatalExeception: createWorkerFatalExeception(port)
};
}

module.exports = {
setupStdio
initializeWorkerStdio,
setup
};
Loading

0 comments on commit af23715

Please sign in to comment.