Skip to content

Commit

Permalink
feat: add workerize for cleaner interop with workers (#2321)
Browse files Browse the repository at this point in the history
* feat: add workerize as a worker loader

* fix: transfer large array instead of copy when sending/getting data from workers

* feat: consume workerize-transferable to add transferable capability to workerize-loader

* chore: remove copywrite headers for adopted code, remove dead code

* fix: remove self-assignment

* fix: add webworker lib for typedoc generation

Co-authored-by: cognite-bulldozer[bot] <51074376+cognite-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
christjt and cognite-bulldozer[bot] authored Aug 5, 2022
1 parent af815bc commit 1ce4078
Show file tree
Hide file tree
Showing 16 changed files with 588 additions and 154 deletions.
3 changes: 2 additions & 1 deletion viewer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"process": "^0.11.10",
"random-seed": "0.3.0",
"raw-loader": "^4.0.2",
"remove-files-webpack-plugin": "^1.5.0",
"rimraf": "3.0.2",
"run-script-os": "1.1.6",
"shx": "0.3.4",
Expand All @@ -122,7 +123,7 @@
"webpack-log": "^3.0.2",
"webpack-node-externals": "^3.0.0",
"whatwg-fetch": "3.6.2",
"worker-loader": "^3.0.8"
"workerize-loader": "2.0.2"
},
"peerDependencies": {
"@cognite/sdk": "^7.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@

import * as THREE from 'three';

import { AutoTerminatingWorker, WorkerPool } from '../utils/WorkerPool';
import { WorkerPool } from '../utils/WorkerPool';
import { ILoader } from './ILoader';
import { ModelDataProvider } from '@reveal/modeldata-api';
import { PointCloudEptGeometryNode } from '../geometry/PointCloudEptGeometryNode';
import EptDecoderWorker from '../workers/eptBinaryDecoder.worker';
import { ParseCommand, ObjectsCommand } from '../workers/eptBinaryDecoder.worker';
import * as EptDecoderWorker from '../workers/eptBinaryDecoder.worker';

import { ParsedEptData, EptInputData } from '../workers/parseEpt';

import { fromThreeVector3 } from '@reveal/utilities';
import { fromThreeVector3, setupTransferableMethodsOnMain } from '@reveal/utilities';
import { RawStylableObject } from '../../styling/StylableObject';

export class EptBinaryLoader implements ILoader {
private readonly _dataLoader: ModelDataProvider;
private readonly _stylableObjects: RawStylableObject[];

static readonly WORKER_POOL = new WorkerPool(32, EptDecoderWorker);
static readonly WORKER_POOL = new WorkerPool(32, EptDecoderWorker as unknown as new () => Worker);

extension(): string {
return '.bin';
Expand Down Expand Up @@ -54,25 +53,26 @@ export class EptBinaryLoader implements ILoader {

async parse(node: PointCloudEptGeometryNode, data: ArrayBuffer): Promise<ParsedEptData> {
const autoTerminatingWorker = await EptBinaryLoader.WORKER_POOL.getWorker();

return new Promise<ParsedEptData>(res => {
autoTerminatingWorker.worker.onmessage = (e: { data: ParsedEptData }) => {
EptBinaryLoader.WORKER_POOL.releaseWorker(autoTerminatingWorker);
res(e.data);
};

postStylableObjectInfo(autoTerminatingWorker, node, this._stylableObjects);

const eptData: EptInputData = {
buffer: data,
schema: node.ept.schema,
scale: node.ept.eptScale,
offset: node.ept.eptOffset,
mins: fromThreeVector3(node.key.b.min)
};

postParseCommand(autoTerminatingWorker, eptData);
const eptDecoderWorker = autoTerminatingWorker.worker as unknown as typeof EptDecoderWorker;
const eptData: EptInputData = {
buffer: data,
schema: node.ept.schema,
scale: node.ept.eptScale,
offset: node.ept.eptOffset,
mins: fromThreeVector3(node.key.b.min)
};

setupTransferableMethodsOnMain(autoTerminatingWorker.worker, {
parse: {
pickTransferablesFromParams: (params: any) => {
return params.buffer;
}
}
});

const result = await eptDecoderWorker.parse(eptData, this._stylableObjects, node.boundingBox.min.toArray());
EptBinaryLoader.WORKER_POOL.releaseWorker(autoTerminatingWorker);
return result;
}
}

Expand All @@ -83,31 +83,6 @@ function createTightBoundingBox(data: ParsedEptData): THREE.Box3 {
);
}

function postParseCommand(autoTerminatingWorker: AutoTerminatingWorker, data: EptInputData) {
const parseMessage: ParseCommand = {
type: 'parse',
data
};

autoTerminatingWorker.worker.postMessage(parseMessage, [parseMessage.data.buffer]);
}

function postStylableObjectInfo(
autoTerminatingWorker: AutoTerminatingWorker,
node: PointCloudEptGeometryNode,
stylableObjects: RawStylableObject[]
): void {
const offsetVec = node.boundingBox.min;

const objectMessage: ObjectsCommand = {
type: 'objects',
objects: stylableObjects,
pointOffset: [offsetVec.x, offsetVec.y, offsetVec.z] as [number, number, number]
};

autoTerminatingWorker.worker.postMessage(objectMessage);
}

function createGeometryFromEptData(data: ParsedEptData): THREE.BufferGeometry {
const geometry = new THREE.BufferGeometry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,40 @@
* Copyright 2022 Cognite AS
*/

import { RawStylableObject, StylableObject, rawToStylableObject } from '../../styling/StylableObject';
import { RawStylableObject, rawToStylableObject } from '../../styling/StylableObject';

import { parseEpt, EptInputData } from './parseEpt';
import { parseEpt, EptInputData, ParsedEptData } from './parseEpt';
import { Vec3 } from '../../styling/shapes/linalg';

const ctx: Worker = self as any;

let objectList: StylableObject[] = [];
let pointOffset: Vec3 = [0, 0, 0];

type CommandType = 'objects' | 'parse';

export interface ICommand {
type: CommandType;
}

export type ObjectsCommand = {
type: 'objects';
objects: RawStylableObject[];
pointOffset: Vec3;
};

export type ParseCommand = {
type: 'parse';
data: EptInputData;
};

ctx.onmessage = function (event: MessageEvent<ICommand>) {
const command = event.data as ICommand;

switch (command.type) {
case 'objects':
const objectsCommand = command as ObjectsCommand;
objectList = objectsCommand.objects.map(rawToStylableObject);
pointOffset = objectsCommand.pointOffset;
break;
case 'parse':
const parseCommand = command as ParseCommand;
parseEpt(ctx, parseCommand.data, objectList, pointOffset);
break;
default:
console.error('Unrecognized eptBinaryDecoder worker command');
import { setupTransferableMethodsOnWorker } from '@reveal/utilities';

setupTransferableMethodsOnWorker({
parse: {
fn: parse,
pickTransferablesFromResult: (result: ParsedEptData) => {
return [
result.position,
result.color,
result.intensity,
result.classification,
result.returnNumber,
result.numberOfReturns,
result.pointSourceId,
result.indices,
result.objectId
].filter(assertDefined);
}
}
};
});

export async function parse(
data: EptInputData,
objects: RawStylableObject[],
pointOffset: Vec3
): Promise<ParsedEptData> {
const objectList = objects.map(rawToStylableObject);
return parseEpt(data, objectList, pointOffset);
}

export default null as any;
function assertDefined(buffer: ArrayBuffer | undefined): buffer is ArrayBuffer {
return buffer !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export type EptInputData = {
mins: [number, number, number];
};

export function parseEpt(worker: Worker, data: EptInputData, objects: StylableObject[], pointOffset: Vec3): void {
export function parseEpt(data: EptInputData, objects: StylableObject[], pointOffset: Vec3): ParsedEptData {
const buffer = data.buffer;
const view = new DataView(buffer);
const schema: SchemaEntry[] = data.schema;
Expand Down Expand Up @@ -259,21 +259,5 @@ export function parseEpt(worker: Worker, data: EptInputData, objects: StylableOb
objectId: objectIdBuffer
};

function assertDefined(buffer: ArrayBuffer | undefined): buffer is ArrayBuffer {
return buffer !== undefined;
}

const transferables = [
message.position,
message.color,
message.intensity,
message.classification,
message.returnNumber,
message.numberOfReturns,
message.pointSourceId,
message.indices,
message.objectId
].filter(assertDefined);

worker.postMessage(message, transferables);
return message;
}
2 changes: 2 additions & 0 deletions viewer/packages/utilities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ export { worldToNormalizedViewportCoordinates, worldToViewportCoordinates } from
export { DeferredPromise } from './src/DeferredPromise';

export { SceneHandler } from './src/SceneHandler';

export * from './src/workers/workerize-transferable';
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*!
* Copyright 2022 Cognite AS
*/

module.exports = {
'rules': {
'header/header': 'off'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This is a consumed library from https://github.com/naoak/workerize-transferable

The source code has been integrated such that ESM modules are properly translated in jest.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*!
* Adopted from https://github.com/naoak/workerize-transferable
*/
export * from './message-types';
export * from './on-main';
export * from './on-worker';
export * from './util';
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*!
* Adopted from https://github.com/naoak/workerize-transferable
*/

export const MESSAGE_TYPE_RPC_TRANSFERABLE = 'RPC_transferable';
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*!
* Adopted from https://github.com/naoak/workerize-transferable
*/
import { MESSAGE_TYPE_RPC_TRANSFERABLE } from './message-types';

type Resolve = (value?: unknown) => void;
type Reject = (reason?: any) => void;

/** Options for worker method params */
export type WorkerMethodParamsOptions = {
/** pick transferables from method params */
pickTransferablesFromParams?: (params: any) => any[];
};

/**
* Setup worker methods which receive transferables from worker method params. This function should be executed on the main thread.
* @param worker worker instance
* @param methods an object whose key is method name and whose value is options how to pick transferables from method params
*/
export function setupTransferableMethodsOnMain<WORKER extends Worker>(
worker: WORKER,
methods: {
[x: string]: WorkerMethodParamsOptions;
}
): void {
let c = 0;
const callbacks: {
[x: number]: [Resolve, Reject];
} = {};
worker.addEventListener('message', e => {
const d = e.data;
if (d.type !== MESSAGE_TYPE_RPC_TRANSFERABLE) {
return;
}
const f = callbacks[d.id];
if (f) {
delete callbacks[d.id];
if (d.error) {
f[1](Object.assign(Error(d.error.message), d.error));
} else {
f[0](d.result);
}
}
});
Object.keys(methods).forEach(method => {
(worker as any)[method] = (...params: any[]) =>
new Promise((a: Resolve, b: Reject) => {
const id = ++c;
callbacks[id] = [a, b];
const opts = methods[method];
worker.postMessage(
{ type: MESSAGE_TYPE_RPC_TRANSFERABLE, id, method, params },
opts.pickTransferablesFromParams ? opts.pickTransferablesFromParams(params) : []
);
});
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*!
* Adopted from https://github.com/naoak/workerize-transferable
*/
import { MESSAGE_TYPE_RPC_TRANSFERABLE } from './message-types';
import { getGlobalThis } from './util';

/** Options for worker method result */
export type WorkerMethodResultOptions = {
/** worker method */
fn: (...params: any[]) => any;

/** pick transferables from method result */
pickTransferablesFromResult?: (result: any) => any[];
};

/**
* Setup worker methods which return transferables. This function should be executed on the worker thread.
* @param methods an object whose key is method name and whose value is options how to pick transferables from method result
*/
export function setupTransferableMethodsOnWorker(methods: { [x: string]: WorkerMethodResultOptions }): void {
const globals = getGlobalThis();
globals.addEventListener('message', e => {
const { type, method, id, params } = e.data;
let opts: WorkerMethodResultOptions;
let p: Promise<any>;
if (type === MESSAGE_TYPE_RPC_TRANSFERABLE && method) {
if ((opts = methods[method])) {
p = Promise.resolve().then(() => opts.fn(...params));
} else {
p = Promise.reject('No such method');
}
p.then(result => {
globals.postMessage(
{ type: MESSAGE_TYPE_RPC_TRANSFERABLE, id, result },
opts.pickTransferablesFromResult ? opts.pickTransferablesFromResult(result) : []
);
}).catch(e => {
let message;
try {
message = e.message.toString();
} catch (ex) {
message = null;
}
const error: any = { message };
if (e.stack) {
error.stack = e.stack;
error.name = e.name;
}
if (e.status) {
error.status = e.status;
error.responseJson = e.responseJson;
}
globals.postMessage({
type: MESSAGE_TYPE_RPC_TRANSFERABLE,
id,
error
});
});
}
});
}
Loading

0 comments on commit 1ce4078

Please sign in to comment.