Skip to content

Commit

Permalink
module: have a single hooks thread for all workers
Browse files Browse the repository at this point in the history
PR-URL: nodejs#52706
Reviewed-By: Geoffrey Booth <webadmin@geoffreybooth.com>
Reviewed-By: Jacob Smith <jacob@frende.me>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
  • Loading branch information
dygabo authored and sophoniie committed Jun 20, 2024
1 parent d597f72 commit 8c1b6ff
Show file tree
Hide file tree
Showing 23 changed files with 395 additions and 81 deletions.
2 changes: 2 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ port.on('message', (message) => {
filename,
hasStdin,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -109,6 +110,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
127 changes: 84 additions & 43 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort } = require('worker_threads');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -482,6 +482,8 @@ class HooksProxy {
*/
#worker;

#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
* if the worker has already sent a notification before putting the main
Expand All @@ -499,26 +501,38 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const { InternalWorker, hooksPort } = require('internal/worker');
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
if (isMainThread) {
// Main thread is the only one that creates the internal single hooks worker
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;
}
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -535,6 +549,37 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;

const {
port1: fromHooksThread,
port2: toHooksThread,
} = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}

this.#portToHooksThread.postMessage(
{
__proto__: null,
args,
lock: this.#lock,
method,
port: toHooksThread,
},
usedTransferList,
);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -543,22 +588,7 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -567,7 +597,11 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
this.#worker.ref();
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
}

let response;
Expand All @@ -576,18 +610,26 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(asyncCommChannel.port1);
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
this.#worker.unref();
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
}

const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
fromHooksThread.close();

return this.#unwrapMessage(response);
}

/**
Expand All @@ -598,11 +640,7 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);

let response;
do {
Expand All @@ -611,14 +649,17 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = this.#worker.receiveMessageSync();
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();

return this.#unwrapMessage(response);
}

Expand Down
19 changes: 16 additions & 3 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -607,21 +608,26 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register some loader specifier.
* Register a loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] }}
* @returns {{ format: string, url: URL['href'] } | undefined}
*/
register(originalSpecifier, parentURL, data, transferList) {
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
if (isMainThread) {
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
// delegate their hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
}

/**
Expand Down Expand Up @@ -719,6 +725,12 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -780,6 +792,7 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading

0 comments on commit 8c1b6ff

Please sign in to comment.