From 061261697b310347ac777bf116a579f161e5358a Mon Sep 17 00:00:00 2001 From: Mestery Date: Fri, 27 Aug 2021 00:34:40 +0200 Subject: [PATCH] fixup! feat(node): worker_threads --- node/events.ts | 84 +++++++------- node/testdata/worker_threads.ts | 29 +++++ node/worker_threads.ts | 193 ++++++++++++++++++++------------ node/worker_threads_test.ts | 151 +++++++++++++++++++++++-- 4 files changed, 340 insertions(+), 117 deletions(-) create mode 100644 node/testdata/worker_threads.ts diff --git a/node/events.ts b/node/events.ts index 64d51bf81210a..0f1252298642d 100644 --- a/node/events.ts +++ b/node/events.ts @@ -32,6 +32,22 @@ export interface WrappedFunction extends Function { listener: GenericFunction; } +export interface OnceableEventEmitter { + once(event: string | symbol, listener: GenericFunction): unknown; + removeListener( + event: string | symbol, + listener: GenericFunction, + ): unknown; +} + +export interface OnableEventEmitter { + on(event: string | symbol, listener: GenericFunction): unknown; + removeListener( + event: string | symbol, + listener: GenericFunction, + ): unknown; +} + function ensureArray(maybeArray: T[] | T): T[] { return Array.isArray(maybeArray) ? maybeArray : [maybeArray]; } @@ -175,8 +191,8 @@ export class EventEmitter { * Returns an array listing the events for which the emitter has * registered listeners. */ - public eventNames(): [string | symbol] { - return Reflect.ownKeys(this._events) as [string | symbol]; + public eventNames(): (string | symbol)[] { + return Reflect.ownKeys(this._events); } /** @@ -483,51 +499,37 @@ export class EventEmitter { * will resolve with an array of all the arguments emitted to the given event. */ public static once( - emitter: EventEmitter | EventTarget, - name: string, + emitter: OnceableEventEmitter, + name: string | symbol, // deno-lint-ignore no-explicit-any ): Promise { return new Promise((resolve, reject) => { - if (emitter instanceof EventTarget) { - // EventTarget does not have `error` event semantics like Node - // EventEmitters, we do not listen to `error` events here. - emitter.addEventListener( - name, - (...args) => { - resolve(args); - }, - { once: true, passive: false, capture: false }, - ); - return; - } else if (emitter instanceof EventEmitter) { + // deno-lint-ignore no-explicit-any + const eventListener = (...args: any[]): void => { + if (errorListener !== undefined) { + emitter.removeListener("error", errorListener); + } + resolve(args); + }; + let errorListener: GenericFunction; + + // Adding an error listener is not optional because + // if an error is thrown on an event emitter we cannot + // guarantee that the actual event we are waiting will + // be fired. The result could be a silent way to create + // memory or file descriptor leaks, which is something + // we should avoid. + if (name !== "error") { // deno-lint-ignore no-explicit-any - const eventListener = (...args: any[]): void => { - if (errorListener !== undefined) { - emitter.removeListener("error", errorListener); - } - resolve(args); + errorListener = (err: any): void => { + emitter.removeListener(name, eventListener); + reject(err); }; - let errorListener: GenericFunction; - - // Adding an error listener is not optional because - // if an error is thrown on an event emitter we cannot - // guarantee that the actual event we are waiting will - // be fired. The result could be a silent way to create - // memory or file descriptor leaks, which is something - // we should avoid. - if (name !== "error") { - // deno-lint-ignore no-explicit-any - errorListener = (err: any): void => { - emitter.removeListener(name, eventListener); - reject(err); - }; - - emitter.once("error", errorListener); - } - emitter.once(name, eventListener); - return; + emitter.once("error", errorListener); } + + emitter.once(name, eventListener); }); } @@ -538,7 +540,7 @@ export class EventEmitter { * emitted event arguments. */ public static on( - emitter: EventEmitter, + emitter: OnableEventEmitter, event: string | symbol, ): AsyncIterable { // deno-lint-ignore no-explicit-any diff --git a/node/testdata/worker_threads.ts b/node/testdata/worker_threads.ts new file mode 100644 index 0000000000000..7d89d2afc69a3 --- /dev/null +++ b/node/testdata/worker_threads.ts @@ -0,0 +1,29 @@ +import { + getEnvironmentData, + isMainThread, + parentPort, + threadId, + workerData, +} from "../worker_threads.ts"; +import { once } from "../events.ts"; + +async function message(expectedMessage: string) { + const [message] = await once(parentPort, "message"); + if (message !== expectedMessage) { + // fail test + parentPort.close(); + } +} + +await message("Hello, how are you my thread?"); +parentPort.postMessage("I'm fine!"); + +parentPort.postMessage({ + isMainThread, + threadId, + workerData: Array.isArray(workerData) && + workerData[workerData.length - 1] instanceof MessagePort + ? workerData.slice(0, -1) + : workerData, + envData: [getEnvironmentData("test"), getEnvironmentData(1)], +}); diff --git a/node/worker_threads.ts b/node/worker_threads.ts index ceee0e09c09a2..3919f11e194ae 100644 --- a/node/worker_threads.ts +++ b/node/worker_threads.ts @@ -1,43 +1,15 @@ /// import { notImplemented } from "./_utils.ts"; -import { resolve, toFileUrl } from "../path/mod.ts"; -import { EventEmitter } from "./events.ts"; +import { EventEmitter, GenericFunction, once } from "./events.ts"; let environmentData = new Map(); let threads = 0; -function attachProxy>(target: T, child: object) { - function selectTarget(property: PropertyKey) { - return Reflect.has(target, property) ? target : child; - } - - const getOrSet = (type: "get" | "set") => (_: T, property: string | symbol, receiver: any): any => { - const target = selectTarget(property); - const ret = Reflect[type](target, property, receiver); - if (typeof ret === "function") return ret.bind(target); - return ret; - }; - - return new Proxy(target, { - get: getOrSet("get"), - set: getOrSet("set"), - deleteProperty(_, property) { - return Reflect.deleteProperty(selectTarget(property), property); - }, - getOwnPropertyDescriptor(_, property) { - return Reflect.getOwnPropertyDescriptor(selectTarget(property), property); - }, - has: (target, property) => Reflect.has(target, property) || Reflect.has(child, property), - ownKeys: (target) => Reflect.ownKeys(target).concat(Reflect.ownKeys(child)), - }); -} - -interface _WorkerOptions { +interface WorkerOptions { // only for typings - argv?: any[]; - env?: object; - eval?: boolean; + argv?: unknown[]; + env?: Record; execArgv?: string[]; stdin?: boolean; stdout?: boolean; @@ -50,42 +22,61 @@ interface _WorkerOptions { stackSizeMb?: number; }; + eval?: boolean; transferList?: Transferable[]; - workerData?: any; + workerData?: unknown; } -interface _Worker extends Worker, EventEmitter {} -class _Worker extends Worker { + +const kHandle = Symbol("kHandle"); +class _Worker extends EventEmitter { readonly threadId: number; - readonly resourceLimits: Required> = { + readonly resourceLimits: Required< + NonNullable + > = { maxYoungGenerationSizeMb: -1, maxOldGenerationSizeMb: -1, codeRangeSizeMb: -1, stackSizeMb: 4, }; - private readonly emitter = new EventEmitter(); - - constructor(specifier: URL | string, options?: _WorkerOptions) { - super(typeof specifier === "string" ? toFileUrl(resolve(specifier)) : specifier, { - ...(options || {}), - type: "module", - // unstable - deno: { namespace: true }, - }); - this.addEventListener("error", (event) => this.emitter.emit("error", event.error || event.message)); - this.addEventListener("messageerror", (event) => this.emitter.emit("messageerror", event.data)); - this.addEventListener("message", (event) => this.emitter.emit("message", event.data)); - this.postMessage({ + private readonly [kHandle]: globalThis.Worker; + + postMessage: Worker["postMessage"]; + + constructor(specifier: URL | string, options?: WorkerOptions) { + super(); + const handle = this[kHandle] = new globalThis.Worker( + options?.eval === true ? `data:text/javascript,${specifier}` : specifier, + { + ...(options || {}), + type: "module", + // unstable + deno: { namespace: true }, + }, + ); + handle.addEventListener( + "error", + (event) => this.emit("error", event.error || event.message), + ); + handle.addEventListener( + "messageerror", + (event) => this.emit("messageerror", event.data), + ); + handle.addEventListener( + "message", + (event) => this.emit("message", event.data), + ); + handle.postMessage({ environmentData, threadId: (this.threadId = ++threads), workerData: options?.workerData, }, options?.transferList || []); - this.emitter.emit("online"); - return attachProxy(this, this.emitter); + this.postMessage = this[kHandle].postMessage.bind(this[kHandle]); + this.emit("online"); } terminate() { - super.terminate(); - this.emitter.emit("exit", 0); + this[kHandle].terminate(); + this.emit("exit", 0); } readonly getHeapSnapshot = notImplemented; @@ -93,7 +84,9 @@ class _Worker extends Worker { readonly performance = globalThis.performance; } -export const isMainThread = typeof WorkerGlobalScope === "undefined" || self instanceof WorkerGlobalScope === false; +export const isMainThread = typeof DedicatedWorkerGlobalScope === "undefined" || + self instanceof DedicatedWorkerGlobalScope === false; + // fake resourceLimits export const resourceLimits = isMainThread ? {} : { maxYoungGenerationSizeMb: 48, @@ -103,25 +96,87 @@ export const resourceLimits = isMainThread ? {} : { }; let threadId = 0; -let workerData = null; -type ParentPort = WorkerGlobalScope & typeof globalThis & EventEmitter; +let workerData: unknown = null; + +// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 +interface NodeEventTarget extends + Pick< + EventEmitter, + "eventNames" | "listenerCount" | "emit" | "removeAllListeners" + > { + setMaxListeners(n: number): void; + getMaxListeners(): number; + off(eventName: string, listener: GenericFunction): NodeEventTarget; + on(eventName: string, listener: GenericFunction): NodeEventTarget; + once(eventName: string, listener: GenericFunction): NodeEventTarget; + addListener: NodeEventTarget["on"]; + removeListener: NodeEventTarget["off"]; +} + +type ParentPort = + & DedicatedWorkerGlobalScope + & typeof globalThis + & NodeEventTarget; + +// deno-lint-ignore no-explicit-any let parentPort: ParentPort = null as any; if (!isMainThread) { - ({ threadId, workerData, environmentData } = await new Promise((resolve) => { - self.addEventListener("message", (event: MessageEvent) => resolve(event.data), { once: true }); - })); - parentPort = attachProxy(self as ParentPort, new EventEmitter()); + // deno-lint-ignore no-explicit-any + const listeners = new WeakMap any>(); + + parentPort = self as ParentPort; + parentPort.off = parentPort.removeListener = function ( + this: ParentPort, + name, + listener, + ) { + this.removeEventListener(name, listeners.get(listener)!); + listeners.delete(listener); + return this; + }; + parentPort.on = parentPort.addListener = function ( + this: ParentPort, + name, + listener, + ) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + return this; + }; + parentPort.once = function (this: ParentPort, name, listener) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + return this; + }; + + // mocks + parentPort.setMaxListeners = () => {}; + parentPort.getMaxListeners = () => Infinity; + parentPort.eventNames = () => [""]; + parentPort.listenerCount = () => 0; + + parentPort.emit = () => notImplemented(); + parentPort.removeAllListeners = () => notImplemented(); + + ([{ threadId, workerData, environmentData }] = await once( + parentPort, + "message", + )); + + // alias parentPort.addEventListener("offline", () => parentPort.emit("close")); - parentPort.addEventListener("message", (event: MessageEvent) => parentPort.emit("message", event.data)); - parentPort.addEventListener("messageerror", (event: MessageEvent) => parentPort.emit("messageerror", event.data)); } -export function getEnvironmentData(key: any) { +export function getEnvironmentData(key: unknown) { return environmentData.get(key); } -export function setEnvironmentData(key: any, value: any) { +export function setEnvironmentData(key: unknown, value?: unknown) { if (value === undefined) { environmentData.delete(key); } else { @@ -135,13 +190,13 @@ export const BroadcastChannel = globalThis.BroadcastChannel; export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV"); export { _Worker as Worker, - parentPort, - threadId, - workerData, notImplemented as markAsUntransferable, notImplemented as moveMessagePortToContext, notImplemented as receiveMessageOnPort, -} + parentPort, + threadId, + workerData, +}; export default { markAsUntransferable: notImplemented, @@ -159,4 +214,4 @@ export default { resourceLimits, parentPort, isMainThread, -} +}; diff --git a/node/worker_threads_test.ts b/node/worker_threads_test.ts index 3cbe78137cb2c..175df57b2bec3 100644 --- a/node/worker_threads_test.ts +++ b/node/worker_threads_test.ts @@ -1,39 +1,40 @@ -import { assertEquals, assertObjectMatch } from "../testing/asserts.ts"; +import { assert, assertEquals, assertObjectMatch } from "../testing/asserts.ts"; +import { EventEmitter, once } from "./events.ts"; import * as workerThreads from "./worker_threads.ts"; Deno.test({ name: "[worker_threads] isMainThread", fn() { assertEquals(workerThreads.isMainThread, true); - } + }, }); Deno.test({ name: "[worker_threads] threadId", fn() { assertEquals(workerThreads.threadId, 0); - } + }, }); Deno.test({ name: "[worker_threads] resourceLimits", fn() { assertObjectMatch(workerThreads.resourceLimits, {}); - } + }, }); Deno.test({ name: "[worker_threads] parentPort", fn() { assertEquals(workerThreads.parentPort, null); - } + }, }); Deno.test({ name: "[worker_threads] workerData", fn() { assertEquals(workerThreads.workerData, null); - } + }, }); Deno.test({ @@ -41,6 +42,142 @@ Deno.test({ fn() { workerThreads.setEnvironmentData("test", "test"); assertEquals(workerThreads.getEnvironmentData("test"), "test"); - } + // delete + workerThreads.setEnvironmentData("test"); + assertEquals(workerThreads.getEnvironmentData("test"), undefined); + }, }); +Deno.test({ + name: "[worker_threads] Worker threadId", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + assertEquals((await once(worker, "message"))[0].threadId, 1); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].threadId, 2); + worker1.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker basics", + async fn() { + workerThreads.setEnvironmentData("test", "test"); + workerThreads.setEnvironmentData(1, { + test: "random", + random: "test", + }); + const { port1 } = new MessageChannel(); + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + { + workerData: ["hey", true, false, 2, port1], + transferList: [port1], + }, + ); + worker.postMessage("Hello, how are you my thread?"); + assertEquals((await once(worker, "message"))[0], "I'm fine!"); + const data = (await once(worker, "message"))[0]; + // data.threadId can be 1 when this test is runned individually + if (data.threadId === 1) data.threadId = 3; + assertObjectMatch(data, { + isMainThread: false, + threadId: 3, + workerData: ["hey", true, false, 2], + envData: ["test", { test: "random", random: "test" }], + }); + worker.terminate(); + }, + sanitizeResources: false, +}); + +const workerThreadsURL = JSON.stringify( + new URL("./worker_threads.ts", import.meta.url).toString(), +); + +Deno.test({ + name: "[worker_threads] Worker eval", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from ${workerThreadsURL}; + parentPort.postMessage("It works!"); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], "It works!"); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] inheritences", + async fn() { + const eventsURL = JSON.stringify( + new URL("./events.ts", import.meta.url).toString(), + ); + + const worker = new workerThreads.Worker( + ` + import { EventEmitter } from ${eventsURL}; + import { parentPort } from ${workerThreadsURL}; + parentPort.postMessage(parentPort instanceof EventTarget); + parentPort.postMessage(parentPort instanceof EventEmitter); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], true); + assertEquals((await once(worker, "message"))[0], false); + assert(worker instanceof EventEmitter); + assert(!(worker instanceof EventTarget)); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker workerData", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + { + workerData: null, + }, + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + assertEquals((await once(worker, "message"))[0].workerData, null); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].workerData, undefined); + worker1.terminate(); + }, +}); + +// Deno.test({ +// name: "[worker_threads] Worker with relative path", +// async fn() { +// const worker = new workerThreads.Worker("./testdata/worker_threads.ts"); +// worker.postMessage("Hello, how are you my thread?"); +// assertEquals((await once(worker, "message"))[0], "I'm fine!"); +// worker.terminate(); +// }, +// });