Skip to content

Commit

Permalink
esm: have a single hooks thread for all workers
Browse files Browse the repository at this point in the history
  • Loading branch information
dygabo committed Apr 26, 2024
1 parent 4221631 commit fa24d02
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 53 deletions.
2 changes: 2 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ port.on('message', (message) => {
manifestSrc,
manifestURL,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -111,6 +112,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
109 changes: 68 additions & 41 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort } = require('worker_threads');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -481,6 +481,7 @@ class HooksProxy {
* The InternalWorker instance, which lets us communicate with the loader thread.
*/
#worker;
#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
Expand All @@ -499,26 +500,42 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker } = require('internal/worker');
const { InternalWorker, hooksPort } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
if (isMainThread) {
// Main thread is the only one that creates the internal single hooks worker
const { port2: portFromHooksThread } = new MessageChannel();
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
transferList: [portFromHooksThread],
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;
}
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -535,6 +552,23 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, ...args) {
this.waitForWorker();
MessageChannel ??= require('internal/worker/io').MessageChannel;
const { port1: fromHooksThread, port2: toHooksThread } = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}
this.#portToHooksThread.postMessage(
{ __proto__: null, method, args, lock: this.#lock, port: toHooksThread }, usedTransferList);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -543,22 +577,7 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, ...args);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -567,7 +586,11 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
this.#worker.ref();
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
}

let response;
Expand All @@ -576,18 +599,25 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(asyncCommChannel.port1);
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
this.#worker.unref();
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();
return this.#unwrapMessage(response);
}

/**
Expand All @@ -598,11 +628,7 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, ...args);

let response;
do {
Expand All @@ -611,14 +637,15 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = this.#worker.receiveMessageSync();
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}
fromHooksThread.close();
return this.#unwrapMessage(response);
}

Expand Down
21 changes: 17 additions & 4 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -594,21 +595,26 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register some loader specifier.
* Register a loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] }}
* @returns {{ format: string, url: URL['href'] } | undefined}
*/
register(originalSpecifier, parentURL, data, transferList) {
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
if (isMainThread) {
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
// delegate thier hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
}

/**
Expand All @@ -617,7 +623,7 @@ class CustomizedModuleLoader {
* be resolved.
* @param {string} [parentURL] The URL path of the module's parent.
* @param {ImportAttributes} importAttributes Attributes from the import
* statement or expression.
* statement or exp-ression.
* @returns {{ format: string, url: URL['href'] }}
*/
resolve(originalSpecifier, parentURL, importAttributes) {
Expand Down Expand Up @@ -706,6 +712,12 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -767,6 +779,7 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
46 changes: 38 additions & 8 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
// so it can detect the exit event.
const { exit } = process;
process.exit = function(code) {
if (hooks) {
for (const registeredPort of allThreadRegisteredHandlerPorts) {
registeredPort.postMessage(wrapMessage('exit', code ?? process.exitCode));
}
for (const { port, lock: operationLock } of unsettledResponsePorts) {
port.postMessage(wrapMessage('exit', code ?? process.exitCode));
// Wake all threads that have pending operations. Is that needed???
AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
}
}
syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode));
AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
Expand Down Expand Up @@ -145,8 +156,11 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
const unsettledResponsePorts = new SafeSet();

process.on('beforeExit', () => {
for (const port of unsettledResponsePorts) {
for (const { port, lock: operationLock } of unsettledResponsePorts) {
port.postMessage(wrapMessage('never-settle'));
// Wake all threads that have pending operations. Is that needed???
AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
}
unsettledResponsePorts.clear();

Expand All @@ -164,24 +178,39 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
setImmediate(() => {});
});

const allThreadRegisteredHandlerPorts = [];
function registerHandler(toWorkerThread) {
toWorkerThread.on('message', handleMessage);
allThreadRegisteredHandlerPorts.push(toWorkerThread);
}

function getMessageHandler(method) {
if (method === '#registerWorkerClient') {
return registerHandler;
}
return hooks[method];
}

/**
* Handles incoming messages from the main thread or other workers.
* @param {object} options - The options object.
* @param {string} options.method - The name of the hook.
* @param {Array} options.args - The arguments to pass to the method.
* @param {MessagePort} options.port - The message port to use for communication.
* @param {Int32Array} options.lock - The shared memory where the caller expects to get awaken.
*/
async function handleMessage({ method, args, port }) {
async function handleMessage({ method, args, port, lock: msgLock }) {
// Each potential exception needs to be caught individually so that the correct error is sent to
// the main thread.
let hasError = false;
let shouldRemoveGlobalErrorHandler = false;
assert(typeof hooks[method] === 'function');
assert(typeof getMessageHandler(method) === 'function');
if (port == null && !hasUncaughtExceptionCaptureCallback()) {
// When receiving sync messages, we want to unlock the main thread when there's an exception.
process.on('uncaughtException', errorHandler);
shouldRemoveGlobalErrorHandler = true;
}
const usedLock = msgLock ?? lock;

// We are about to yield the execution with `await ReflectApply` below. In case the code
// following the `await` never runs, we remove the message handler so the `beforeExit` event
Expand All @@ -192,17 +221,18 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
clearImmediate(immediate);
immediate = setImmediate(checkForMessages).unref();

unsettledResponsePorts.add(port ?? syncCommPort);
const unsettledActionData = { port: port ?? syncCommPort, lock: usedLock };
unsettledResponsePorts.add(unsettledActionData);

let response;
try {
response = await ReflectApply(hooks[method], hooks, args);
response = await ReflectApply(getMessageHandler(method), hooks, args);
} catch (exception) {
hasError = true;
response = exception;
}

unsettledResponsePorts.delete(port ?? syncCommPort);
unsettledResponsePorts.delete(unsettledActionData);

// Send the method response (or exception) to the main thread.
try {
Expand All @@ -215,8 +245,8 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
(port ?? syncCommPort).postMessage(wrapMessage('error', exception));
}

AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
AtomicsAdd(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
if (shouldRemoveGlobalErrorHandler) {
process.off('uncaughtException', errorHandler);
}
Expand Down
Loading

0 comments on commit fa24d02

Please sign in to comment.