Skip to content

Commit

Permalink
feat: add new option to allow waiting thread start before thread-spaw…
Browse files Browse the repository at this point in the history
…n return (#116)
  • Loading branch information
toyobayashi authored May 8, 2024
1 parent 1d4b624 commit e1b1064
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 46 deletions.
1 change: 1 addition & 0 deletions packages/core/src/emnapi/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export declare type BaseCreateOptions = {
nodeBinding?: NodeBinding
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
onCreateWorker?: (info: CreateWorkerInfo) => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
51 changes: 43 additions & 8 deletions packages/core/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,27 @@ export class MessageHandler {
const onLoad = this.onLoad
if (type === 'load') {
if (this.instance !== undefined) return
const source = onLoad(payload)
let source: InstantiatedSource | Promise<InstantiatedSource>
try {
source = onLoad(payload)
} catch (err) {
onLoaded.call(this, err, null, payload)
return
}
const then = source && 'then' in source ? source.then : undefined
if (typeof then === 'function') {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
then.call(
source,
(source) => { onLoaded.call(this, source) },
(err) => { throw err }
(source) => { onLoaded.call(this, null, source, payload) },
(err) => { onLoaded.call(this, err, null, payload) }
)
} else {
onLoaded.call(this, source as InstantiatedSource)
onLoaded.call(this, null, source as InstantiatedSource, payload)
}
} else if (type === 'start') {
handleAfterLoad.call(this, e, () => {
notifyPthreadCreateResult(payload.sab, 1)
this.napiModule!.startThread(payload.tid, payload.arg)
})
} else if (type === 'async-worker-init') {
Expand All @@ -77,17 +84,45 @@ function handleAfterLoad (this: MessageHandler, e: any, f: (e: any) => void): vo
}
}

function onLoaded (this: MessageHandler, source: InstantiatedSource): void {
interface LoadPayload {
wasmModule: WebAssembly.Module
wasmMemory: WebAssembly.Memory
sab?: Int32Array
}

function notifyPthreadCreateResult (sab: Int32Array | undefined, result: number): void {
if (sab) {
Atomics.store(sab, 0, result)
Atomics.notify(sab, 0)
}
}

function onLoaded (this: MessageHandler, err: Error | null, source: InstantiatedSource | null, payload: LoadPayload): void {
if (err) {
notifyPthreadCreateResult(payload.sab, 2)
throw err
}

if (source == null) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object')
}

const instance = source.instance
const napiModule = source.napiModule

if (!instance) throw new TypeError('onLoad should return an object which includes "instance"')
if (!napiModule) throw new TypeError('onLoad should return an object which includes "napiModule"')
if (!napiModule.childThread) throw new Error('napiModule should be created with `childThread: true`')
if (!instance) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object which includes "instance"')
}
if (!napiModule) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object which includes "napiModule"')
}
if (!napiModule.childThread) {
notifyPthreadCreateResult(payload.sab, 2)
throw new Error('napiModule should be created with `childThread: true`')
}

this.instance = instance
this.napiModule = napiModule
Expand Down
113 changes: 75 additions & 38 deletions packages/emnapi/src/core/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import { makeDynCall, to64 } from 'emscripten:parse-tools'

type SharedInt32Array = Int32Array

export interface InitOptions {
instance: WebAssembly.Instance
module: WebAssembly.Module
Expand Down Expand Up @@ -36,6 +38,7 @@ declare const process: any
export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string'
export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread)
export var reuseWorker = Boolean(options.reuseWorker)
export var waitThreadStart = Boolean(options.waitThreadStart)

export var wasmInstance: WebAssembly.Instance
export var wasmModule: WebAssembly.Module
Expand Down Expand Up @@ -251,7 +254,32 @@ function terminateWorker (worker: any): void {
}
}

function cleanThread (worker: any, tid: number, force?: boolean): void {
if (!force && reuseWorker) {
PThread.returnWorkerToPool(worker)
} else {
delete PThread.pthreads[tid]
const index = PThread.runningWorkers.indexOf(worker)
if (index !== -1) {
PThread.runningWorkers.splice(index, 1)
}
terminateWorker(worker)
delete worker.__emnapi_tid
}
}

function checkSharedWasmMemory (): void {
if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) {
throw new Error(
'Multithread features require shared wasm memory. ' +
'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking'
)
}
}

function spawnThread (startArg: number, errorOrTid: number): number {
checkSharedWasmMemory()

const isNewABI = errorOrTid !== undefined
if (!isNewABI) {
errorOrTid = _malloc(to64('8'))
Expand Down Expand Up @@ -284,12 +312,44 @@ function spawnThread (startArg: number, errorOrTid: number): number {
return isError ? -result : result
}

let sab: Int32Array | undefined
if (waitThreadStart) {
sab = new Int32Array(new SharedArrayBuffer(4))
Atomics.store(sab, 0, 0)
}

let worker: any
const tid = PThread.nextWorkerID + 43
try {
worker = PThread.getNewWorker()
worker = PThread.getNewWorker(sab)
if (!worker) {
throw new Error('failed to get new worker')
}

const WASI_THREADS_MAX_TID = 0x1FFFFFFF
PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42)
PThread.pthreads[tid] = worker
worker.__emnapi_tid = tid
if (ENVIRONMENT_IS_NODE) {
worker.ref()
}
worker.postMessage({
__emnapi__: {
type: 'start',
payload: {
tid,
arg: startArg,
sab
}
}
})
if (waitThreadStart) {
Atomics.wait(sab!, 0, 0)
const r = Atomics.load(sab!, 0)
if (r === 2) {
throw new Error('failed to start pthread')
}
}
} catch (e) {
const EAGAIN = 6

Expand All @@ -305,31 +365,19 @@ function spawnThread (startArg: number, errorOrTid: number): number {
return -EAGAIN
}

const tid = PThread.nextWorkerID + 43

Atomics.store(struct, 0, 0)
Atomics.store(struct, 1, tid)
Atomics.notify(struct, 1)

const WASI_THREADS_MAX_TID = 0x1FFFFFFF
PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42)
PThread.pthreads[tid] = worker
worker.__emnapi_tid = tid
PThread.runningWorkers.push(worker)
if (ENVIRONMENT_IS_NODE) {
worker.ref()
if (!waitThreadStart) {
worker.whenLoaded.catch((err: any) => {
delete worker.whenLoaded
cleanThread(worker, tid, true)
throw err
})
}

worker.postMessage({
__emnapi__: {
type: 'start',
payload: {
tid,
arg: startArg
}
}
})

if (isNewABI) {
return 0
}
Expand Down Expand Up @@ -376,7 +424,7 @@ export var PThread = {
worker.unref()
}
},
loadWasmModuleToWorker: (worker: any) => {
loadWasmModuleToWorker: (worker: any, sab?: SharedInt32Array) => {
if (worker.whenLoaded) return worker.whenLoaded
worker.whenLoaded = new Promise<any>((resolve, reject) => {
worker.onmessage = function (e: any) {
Expand All @@ -395,14 +443,7 @@ export var PThread = {
} else if (type === 'spawn-thread') {
spawnThread(payload.startArg, payload.errorOrTid)
} else if (type === 'cleanup-thread') {
if (reuseWorker) {
PThread.returnWorkerToPool(worker)
} else {
delete PThread.pthreads[payload.tid]
PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1)
terminateWorker(worker)
delete worker.__emnapi_tid
}
cleanThread(worker, payload.tid)
}
}
}
Expand Down Expand Up @@ -437,17 +478,13 @@ export var PThread = {
type: 'load',
payload: {
wasmModule,
wasmMemory
wasmMemory,
sab
}
}
})
} catch (err) {
if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) {
throw new Error(
'Multithread features require shared wasm memory. ' +
'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking'
)
}
checkSharedWasmMemory()
throw err
}
})
Expand All @@ -461,16 +498,16 @@ export var PThread = {
PThread.unusedWorkers.push(worker)
return worker
},
getNewWorker () {
getNewWorker (sab?: SharedInt32Array) {
if (reuseWorker) {
if (PThread.unusedWorkers.length === 0) {
const worker = PThread.allocateUnusedWorker()
PThread.loadWasmModuleToWorker(worker)
PThread.loadWasmModuleToWorker(worker, sab)
}
return PThread.unusedWorkers.pop()
}
const worker = PThread.allocateUnusedWorker()
PThread.loadWasmModuleToWorker(worker)
PThread.loadWasmModuleToWorker(worker, sab)
return PThread.unusedWorkers.pop()
}
}
1 change: 1 addition & 0 deletions packages/emnapi/src/core/scope.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare interface CreateOptions {
childThread?: boolean
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
onCreateWorker?: () => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
1 change: 1 addition & 0 deletions packages/test/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function loadPath (request, options) {
: -RUNTIME_UV_THREADPOOL_SIZE,
filename: request,
reuseWorker: true,
waitThreadStart: true,
onCreateWorker () {
return new Worker(join(__dirname, './worker.js'), {
env: process.env,
Expand Down

0 comments on commit e1b1064

Please sign in to comment.