Skip to content

Commit

Permalink
feat: removed worker deps
Browse files Browse the repository at this point in the history
  • Loading branch information
Venipa committed Nov 5, 2024
1 parent 7db5507 commit 2ce7574
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 21 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"dependencies": {
"@electron-toolkit/preload": "^3.0.1",
"@electron-toolkit/utils": "^3.0.0",
"@giancosta86/worker-agent": "^1.1.1",
"@headlessui/vue": "^1.6.7",
"@paralleldrive/cuid2": "^2.2.2",
"@poppinss/utils": "^5.0.0",
Expand Down
6 changes: 4 additions & 2 deletions src/main/api/createApiWorker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { WorkerAgent } from "@giancosta86/worker-agent";
import ApiProvider from "@main/plugins/apiProvider.plugin";
import SettingsProvider from "@main/plugins/settingsProvider.plugin";
import { API_ROUTES } from "@main/utils/eventNames";
import { WorkerAgent } from "@main/utils/worker";
import logger from "@shared/utils/Logger";
import { BrowserWindow } from "electron";
import modulePathId from "./main?modulePath";
Expand Down Expand Up @@ -38,7 +38,9 @@ export const createApiWorker = async (
if (err) return logger.error(err);
if (!out || typeof out !== "object" || !out.name) return;
Promise.resolve((apiMap[out.name] as any)?.bind(api, ...[out.data].flat())()).then((result) => {
return Promise.resolve(worker?.runOperation({ name: "event", data: [out.id, result] }) ?? null);
return Promise.resolve(
worker?.runOperation({ name: "event", data: [out.id, result] }) ?? null,
);
});
});
return new (class {
Expand Down
3 changes: 3 additions & 0 deletions src/main/utils/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { PromiseAgent } from "./promise-agent";
export { WorkerAgent } from "./worker-agent";

115 changes: 115 additions & 0 deletions src/main/utils/worker/promise-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { createId as generateOperationId } from "@paralleldrive/cuid2";
import { Worker, WorkerOptions } from "node:worker_threads";
import {
MessageFromWorker,
MessageToWorker,
WorkerData,
workerModuleId
} from "./protocol";

type PromiseResolver<T> = (value: T) => void;

type PromiseRejector = (err: Error) => void;

export class PromiseAgent<TInput, TOutput> {
private readonly worker: Worker;

private readonly operationPromiseHandlesByCorrelationId = new Map<
string,
[PromiseResolver<TOutput>, PromiseRejector]
>();

private workerError?: Error;

constructor(operationModuleId: string, logToConsole = false) {
const workerData: WorkerData = {
operationModuleId,
logToConsole
};

const workerOptions: WorkerOptions = {
workerData
};

this.worker = new Worker(workerModuleId, workerOptions)
.on("message", this.handleWorkerMessage.bind(this))
.on("error", this.handleWorkerError.bind(this))
.on("messageerror", this.handleWorkerError.bind(this));
}

private handleWorkerMessage(message: MessageFromWorker): void {
const correlationId = message.correlationId;
if (!correlationId) {
throw new Error("The message has no correlation id!");
}

const promiseHandles =
this.operationPromiseHandlesByCorrelationId.get(correlationId);

if (!promiseHandles) {
throw new Error(`Unknown correlation id: '${message.correlationId}'`);
}

this.operationPromiseHandlesByCorrelationId.delete(correlationId);

const [resolve, reject] = promiseHandles;

switch (message.type) {
case "operationOutput":
resolve(message.value as TOutput);
return;

case "error":
reject(new Error(message.formattedError));
return;
}
}

private handleWorkerError(err: Error): void {
this.workerError = err;

for (const [
,
reject
] of this.operationPromiseHandlesByCorrelationId.values()) {
reject(err);
}

this.operationPromiseHandlesByCorrelationId.clear();
}

runOperation(input: TInput): Promise<TOutput> {
if (this.workerError) {
return Promise.reject(this.workerError);
}

return new Promise<TOutput>((resolve, reject) => {
const correlationId = generateOperationId();

this.operationPromiseHandlesByCorrelationId.set(correlationId, [
resolve,
reject
]);

const message: MessageToWorker = {
type: "operationInput",
correlationId,
value: input
};
this.worker.postMessage(message);
});
}

requestExit(): Promise<number> {
if (this.workerError) {
return Promise.resolve(1);
}

return new Promise<number>(resolve => {
this.worker.on("exit", resolve);

const message: MessageToWorker = { type: "end" };
this.worker.postMessage(message);
});
}
}
29 changes: 29 additions & 0 deletions src/main/utils/worker/protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { join } from "node:path";

export const workerModuleId = join(__dirname, "worker");

export type WorkerData = {
operationModuleId: string;
logToConsole: boolean;
};

export type MessageToWorker =
| {
type: "operationInput";
correlationId?: string;
value: unknown;
}
| {
type: "end";
};

export type MessageFromWorker = { correlationId?: string } & (
| {
type: "operationOutput";
value: unknown;
}
| {
type: "error";
formattedError: string;
}
);
104 changes: 104 additions & 0 deletions src/main/utils/worker/worker-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { Worker, WorkerOptions } from "node:worker_threads";
import {
MessageFromWorker,
MessageToWorker,
WorkerData,
workerModuleId
} from "./protocol";

type GenericCallback = (...args: any[]) => void;

type WorkerEventSubscriptionMethod = (
workerEvent: string,
callback: GenericCallback
) => void;

export class WorkerAgent<TInput, TOutput> {
private readonly worker: Worker;

constructor(operationModuleId: string, logToConsole = false) {
const workerData: WorkerData = {
operationModuleId,
logToConsole
};

const workerOptions: WorkerOptions = {
workerData
};

this.worker = new Worker(workerModuleId, workerOptions);
}

runOperation(input: TInput): void {
const message: MessageToWorker = {
type: "operationInput",
value: input
};
this.worker.postMessage(message);
}

requestExit(): void {
const message: MessageToWorker = { type: "end" };
this.worker.postMessage(message);
}

on(
event: "result",
callback: (err: Error | null, output: TOutput | null) => void
): this;
on(event: "error", callback: (error: Error) => void): this;
on(event: "exit", callback: (exitCode: number) => void): this;

on(event: string, callback: GenericCallback): this {
return this.registerEventListener(this.worker.on, event, callback);
}

once(
event: "result",
callback: (err: Error | null, output: TOutput | null) => void
): this;
once(event: "error", callback: (error: Error) => void): this;
once(event: "exit", callback: (exitCode: number) => void): this;

once(event: string, callback: GenericCallback): this {
return this.registerEventListener(this.worker.once, event, callback);
}

private registerEventListener(
workerEventSubscriptionMethod: WorkerEventSubscriptionMethod,
agentEvent: string,
callback: GenericCallback
): this {
switch (agentEvent) {
case "result":
this.registerResultListener(workerEventSubscriptionMethod, callback);
break;

case "exit":
case "error":
workerEventSubscriptionMethod.call(this.worker, agentEvent, callback);
break;
}

return this;
}

private registerResultListener(
workerSubscriptionMethod: WorkerEventSubscriptionMethod,
callback: GenericCallback
): void {
workerSubscriptionMethod.call(
this.worker,
"message",
(message: MessageFromWorker) => {
switch (message.type) {
case "operationOutput":
return callback(null, message.value);

case "error":
return callback(new Error(message.formattedError), null);
}
}
);
}
}
18 changes: 0 additions & 18 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -559,19 +559,6 @@
resolved "https://registry.yarnpkg.com/@eslint/js/-/js-8.57.1.tgz#de633db3ec2ef6a3c89e2f19038063e8a122e2c2"
integrity sha512-d9zaMRSTIKDLhctzH12MtXvJKSSUhaHcjV+2Z+GK+EEY7XKpP5yR4x+N3TAcHTcu963nIr+TMcCb4DBCYX1z6Q==

"@giancosta86/format-error@^2.0.0":
version "2.1.0"
resolved "https://registry.yarnpkg.com/@giancosta86/format-error/-/format-error-2.1.0.tgz#7aa1eeeb16e123d7669b16eff0190d9da2d047b9"
integrity sha512-JINnfFtNlqhKne65KpNXLz17AMTQV2rKEhul/PYQeZK2buBRP3m4h2xO2ZhRcchJjADQHrEAbYPC2X4QYrnD9w==

"@giancosta86/worker-agent@^1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@giancosta86/worker-agent/-/worker-agent-1.1.1.tgz#bbfc15be0d632767c96ed033645ee4cf15852d6c"
integrity sha512-4GduFpjIkZjD33VkDwyyr823Dq9mQY8fFCBssO7UaUS5lFqHDqoirDnG7YirboY+DPkD0aNeXDziNguEaUut5Q==
dependencies:
"@giancosta86/format-error" "^2.0.0"
uuid "^8.3.2"

"@headlessui/vue@^1.6.7":
version "1.6.7"
resolved "https://registry.yarnpkg.com/@headlessui/vue/-/vue-1.6.7.tgz#433b86e5f86203606e7ac4851777173a327dd8c4"
Expand Down Expand Up @@ -7673,11 +7660,6 @@ utils-merge@1.0.1:
resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713"
integrity sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==

uuid@^8.3.2:
version "8.3.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==

varint@^6.0.0:
version "6.0.0"
resolved "https://registry.yarnpkg.com/varint/-/varint-6.0.0.tgz#9881eb0ce8feaea6512439d19ddf84bf551661d0"
Expand Down

0 comments on commit 2ce7574

Please sign in to comment.