diff --git a/package.json b/package.json index d229075..96b6b22 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,6 @@ "dependencies": { "@electron-toolkit/preload": "^3.0.1", "@electron-toolkit/utils": "^3.0.0", - "@giancosta86/worker-agent": "^1.1.1", "@headlessui/vue": "^1.6.7", "@paralleldrive/cuid2": "^2.2.2", "@poppinss/utils": "^5.0.0", diff --git a/src/main/api/createApiWorker.ts b/src/main/api/createApiWorker.ts index 6c53555..996a80f 100644 --- a/src/main/api/createApiWorker.ts +++ b/src/main/api/createApiWorker.ts @@ -1,7 +1,7 @@ -import { WorkerAgent } from "@giancosta86/worker-agent"; import ApiProvider from "@main/plugins/apiProvider.plugin"; import SettingsProvider from "@main/plugins/settingsProvider.plugin"; import { API_ROUTES } from "@main/utils/eventNames"; +import { WorkerAgent } from "@main/utils/worker"; import logger from "@shared/utils/Logger"; import { BrowserWindow } from "electron"; import modulePathId from "./main?modulePath"; @@ -38,7 +38,9 @@ export const createApiWorker = async ( if (err) return logger.error(err); if (!out || typeof out !== "object" || !out.name) return; Promise.resolve((apiMap[out.name] as any)?.bind(api, ...[out.data].flat())()).then((result) => { - return Promise.resolve(worker?.runOperation({ name: "event", data: [out.id, result] }) ?? null); + return Promise.resolve( + worker?.runOperation({ name: "event", data: [out.id, result] }) ?? null, + ); }); }); return new (class { diff --git a/src/main/utils/worker/index.ts b/src/main/utils/worker/index.ts new file mode 100644 index 0000000..ee241ba --- /dev/null +++ b/src/main/utils/worker/index.ts @@ -0,0 +1,3 @@ +export { PromiseAgent } from "./promise-agent"; +export { WorkerAgent } from "./worker-agent"; + diff --git a/src/main/utils/worker/promise-agent.ts b/src/main/utils/worker/promise-agent.ts new file mode 100644 index 0000000..0d760bb --- /dev/null +++ b/src/main/utils/worker/promise-agent.ts @@ -0,0 +1,115 @@ +import { createId as generateOperationId } from "@paralleldrive/cuid2"; +import { Worker, WorkerOptions } from "node:worker_threads"; +import { + MessageFromWorker, + MessageToWorker, + WorkerData, + workerModuleId +} from "./protocol"; + +type PromiseResolver = (value: T) => void; + +type PromiseRejector = (err: Error) => void; + +export class PromiseAgent { + private readonly worker: Worker; + + private readonly operationPromiseHandlesByCorrelationId = new Map< + string, + [PromiseResolver, PromiseRejector] + >(); + + private workerError?: Error; + + constructor(operationModuleId: string, logToConsole = false) { + const workerData: WorkerData = { + operationModuleId, + logToConsole + }; + + const workerOptions: WorkerOptions = { + workerData + }; + + this.worker = new Worker(workerModuleId, workerOptions) + .on("message", this.handleWorkerMessage.bind(this)) + .on("error", this.handleWorkerError.bind(this)) + .on("messageerror", this.handleWorkerError.bind(this)); + } + + private handleWorkerMessage(message: MessageFromWorker): void { + const correlationId = message.correlationId; + if (!correlationId) { + throw new Error("The message has no correlation id!"); + } + + const promiseHandles = + this.operationPromiseHandlesByCorrelationId.get(correlationId); + + if (!promiseHandles) { + throw new Error(`Unknown correlation id: '${message.correlationId}'`); + } + + this.operationPromiseHandlesByCorrelationId.delete(correlationId); + + const [resolve, reject] = promiseHandles; + + switch (message.type) { + case "operationOutput": + resolve(message.value as TOutput); + return; + + case "error": + reject(new Error(message.formattedError)); + return; + } + } + + private handleWorkerError(err: Error): void { + this.workerError = err; + + for (const [ + , + reject + ] of this.operationPromiseHandlesByCorrelationId.values()) { + reject(err); + } + + this.operationPromiseHandlesByCorrelationId.clear(); + } + + runOperation(input: TInput): Promise { + if (this.workerError) { + return Promise.reject(this.workerError); + } + + return new Promise((resolve, reject) => { + const correlationId = generateOperationId(); + + this.operationPromiseHandlesByCorrelationId.set(correlationId, [ + resolve, + reject + ]); + + const message: MessageToWorker = { + type: "operationInput", + correlationId, + value: input + }; + this.worker.postMessage(message); + }); + } + + requestExit(): Promise { + if (this.workerError) { + return Promise.resolve(1); + } + + return new Promise(resolve => { + this.worker.on("exit", resolve); + + const message: MessageToWorker = { type: "end" }; + this.worker.postMessage(message); + }); + } +} \ No newline at end of file diff --git a/src/main/utils/worker/protocol.ts b/src/main/utils/worker/protocol.ts new file mode 100644 index 0000000..4704c04 --- /dev/null +++ b/src/main/utils/worker/protocol.ts @@ -0,0 +1,29 @@ +import { join } from "node:path"; + +export const workerModuleId = join(__dirname, "worker"); + +export type WorkerData = { + operationModuleId: string; + logToConsole: boolean; +}; + +export type MessageToWorker = + | { + type: "operationInput"; + correlationId?: string; + value: unknown; + } + | { + type: "end"; + }; + +export type MessageFromWorker = { correlationId?: string } & ( + | { + type: "operationOutput"; + value: unknown; + } + | { + type: "error"; + formattedError: string; + } +); \ No newline at end of file diff --git a/src/main/utils/worker/worker-agent.ts b/src/main/utils/worker/worker-agent.ts new file mode 100644 index 0000000..cf48a1d --- /dev/null +++ b/src/main/utils/worker/worker-agent.ts @@ -0,0 +1,104 @@ +import { Worker, WorkerOptions } from "node:worker_threads"; +import { + MessageFromWorker, + MessageToWorker, + WorkerData, + workerModuleId +} from "./protocol"; + +type GenericCallback = (...args: any[]) => void; + +type WorkerEventSubscriptionMethod = ( + workerEvent: string, + callback: GenericCallback +) => void; + +export class WorkerAgent { + private readonly worker: Worker; + + constructor(operationModuleId: string, logToConsole = false) { + const workerData: WorkerData = { + operationModuleId, + logToConsole + }; + + const workerOptions: WorkerOptions = { + workerData + }; + + this.worker = new Worker(workerModuleId, workerOptions); + } + + runOperation(input: TInput): void { + const message: MessageToWorker = { + type: "operationInput", + value: input + }; + this.worker.postMessage(message); + } + + requestExit(): void { + const message: MessageToWorker = { type: "end" }; + this.worker.postMessage(message); + } + + on( + event: "result", + callback: (err: Error | null, output: TOutput | null) => void + ): this; + on(event: "error", callback: (error: Error) => void): this; + on(event: "exit", callback: (exitCode: number) => void): this; + + on(event: string, callback: GenericCallback): this { + return this.registerEventListener(this.worker.on, event, callback); + } + + once( + event: "result", + callback: (err: Error | null, output: TOutput | null) => void + ): this; + once(event: "error", callback: (error: Error) => void): this; + once(event: "exit", callback: (exitCode: number) => void): this; + + once(event: string, callback: GenericCallback): this { + return this.registerEventListener(this.worker.once, event, callback); + } + + private registerEventListener( + workerEventSubscriptionMethod: WorkerEventSubscriptionMethod, + agentEvent: string, + callback: GenericCallback + ): this { + switch (agentEvent) { + case "result": + this.registerResultListener(workerEventSubscriptionMethod, callback); + break; + + case "exit": + case "error": + workerEventSubscriptionMethod.call(this.worker, agentEvent, callback); + break; + } + + return this; + } + + private registerResultListener( + workerSubscriptionMethod: WorkerEventSubscriptionMethod, + callback: GenericCallback + ): void { + workerSubscriptionMethod.call( + this.worker, + "message", + (message: MessageFromWorker) => { + switch (message.type) { + case "operationOutput": + return callback(null, message.value); + + case "error": + return callback(new Error(message.formattedError), null); + } + } + ); + } +} \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index dfbbcc2..038cef7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -559,19 +559,6 @@ resolved "https://registry.yarnpkg.com/@eslint/js/-/js-8.57.1.tgz#de633db3ec2ef6a3c89e2f19038063e8a122e2c2" integrity sha512-d9zaMRSTIKDLhctzH12MtXvJKSSUhaHcjV+2Z+GK+EEY7XKpP5yR4x+N3TAcHTcu963nIr+TMcCb4DBCYX1z6Q== -"@giancosta86/format-error@^2.0.0": - version "2.1.0" - resolved "https://registry.yarnpkg.com/@giancosta86/format-error/-/format-error-2.1.0.tgz#7aa1eeeb16e123d7669b16eff0190d9da2d047b9" - integrity sha512-JINnfFtNlqhKne65KpNXLz17AMTQV2rKEhul/PYQeZK2buBRP3m4h2xO2ZhRcchJjADQHrEAbYPC2X4QYrnD9w== - -"@giancosta86/worker-agent@^1.1.1": - version "1.1.1" - resolved "https://registry.yarnpkg.com/@giancosta86/worker-agent/-/worker-agent-1.1.1.tgz#bbfc15be0d632767c96ed033645ee4cf15852d6c" - integrity sha512-4GduFpjIkZjD33VkDwyyr823Dq9mQY8fFCBssO7UaUS5lFqHDqoirDnG7YirboY+DPkD0aNeXDziNguEaUut5Q== - dependencies: - "@giancosta86/format-error" "^2.0.0" - uuid "^8.3.2" - "@headlessui/vue@^1.6.7": version "1.6.7" resolved "https://registry.yarnpkg.com/@headlessui/vue/-/vue-1.6.7.tgz#433b86e5f86203606e7ac4851777173a327dd8c4" @@ -7673,11 +7660,6 @@ utils-merge@1.0.1: resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713" integrity sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA== -uuid@^8.3.2: - version "8.3.2" - resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" - integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== - varint@^6.0.0: version "6.0.0" resolved "https://registry.yarnpkg.com/varint/-/varint-6.0.0.tgz#9881eb0ce8feaea6512439d19ddf84bf551661d0"