Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for iterators #732

Open
wants to merge 13 commits into
base: current
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
node-version: [18.x, 20.x, 22.x]
node-version: [20.x, 22.x]
runs-on: ${{matrix.os}}
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
node_modules
dist
coverage
tsconfig.tsbuildinfo
93 changes: 47 additions & 46 deletions docs/docs/api-reference/class.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,55 +55,54 @@ This class extends [`EventEmitter`](https://nodejs.org/api/events.html) from Nod
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.

** :::info
:::info
**Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
Workers should be designed to wait for all operations to finish before returning control to the main thread, if any background operations are still running
`async` can be of help (e.g. for cache warming, etc).
:::
**
- `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
- `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
main heap in MB.
- `maxYoungGenerationSizeMb`: (`number`) The maximum size of a heap space for
recently created objects.
- `codeRangeSizeMb`: (`number`) The size of a pre-allocated memory range used
for generated code.
- `stackSizeMb` : (`number`) The default maximum stack size for the thread.
Small values may lead to unusable Worker instances. Default: 4
- `env`: (`object`) If set, specifies the initial value of `process.env` inside
the worker threads. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `argv`: (`any[]`) List of arguments that will be stringified and appended to
`process.argv` in the worker. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `execArgv`: (`string[]`) List of Node.js CLI options passed to the worker.
See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `workerData`: (`any`) Any JavaScript value that can be cloned and made
available as `require('piscina').workerData`. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
for details. Unlike regular Node.js Worker Threads, `workerData` must not
specify any value requiring a `transferList`. This is because the `workerData`
will be cloned for each pooled worker.
- `taskQueue`: (`TaskQueue`) By default, Piscina uses a first-in-first-out
queue for submitted tasks. The `taskQueue` option can be used to provide an
alternative implementation. See [Custom Task Queues](https://github.com/piscinajs/piscina#custom_task_queues) for additional detail.
- `niceIncrement`: (`number`) An optional value that decreases priority for
the individual threads, i.e. the higher the value, the lower the priority
of the Worker threads. This value is used on Unix/Windows and requires the
optional [`@napi-rs/nice`](https://npmjs.org/package/@napi-rs/nice) module to be installed.
See [`nice(2)`](https://linux.die.net/man/2/nice) and [`SetThreadPriority`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority) for more details.
- `trackUnmanagedFds`: (`boolean`) An optional setting that, when `true`, will
cause Workers to track file descriptors managed using `fs.open()` and
`fs.close()`, and will close them automatically when the Worker exits.
Defaults to `true`. (This option is only supported on Node.js 12.19+ and
all Node.js versions higher than 14.6.0).
- `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to
complete all in-flight tasks when `close()` is called. The default is `30000`
- `recordTiming`: (`boolean`) By default, run and wait time will be recorded
for the pool. To disable, set to `false`.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
- `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
- `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
main heap in MB.
- `maxYoungGenerationSizeMb`: (`number`) The maximum size of a heap space for
recently created objects.
- `codeRangeSizeMb`: (`number`) The size of a pre-allocated memory range used
for generated code.
- `stackSizeMb` : (`number`) The default maximum stack size for the thread.
Small values may lead to unusable Worker instances. Default: 4
- `env`: (`object`) If set, specifies the initial value of `process.env` inside
the worker threads. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `argv`: (`any[]`) List of arguments that will be stringified and appended to
`process.argv` in the worker. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `execArgv`: (`string[]`) List of Node.js CLI options passed to the worker.
See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for details.
- `workerData`: (`any`) Any JavaScript value that can be cloned and made
available as `require('piscina').workerData`. See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
for details. Unlike regular Node.js Worker Threads, `workerData` must not
specify any value requiring a `transferList`. This is because the `workerData`
will be cloned for each pooled worker.
- `taskQueue`: (`TaskQueue`) By default, Piscina uses a first-in-first-out
queue for submitted tasks. The `taskQueue` option can be used to provide an
alternative implementation. See [Custom Task Queues](https://github.com/piscinajs/piscina#custom_task_queues) for additional detail.
- `niceIncrement`: (`number`) An optional value that decreases priority for
the individual threads, i.e. the higher the value, the lower the priority
of the Worker threads. This value is used on Unix/Windows and requires the
optional [`@napi-rs/nice`](https://npmjs.org/package/@napi-rs/nice) module to be installed.
See [`nice(2)`](https://linux.die.net/man/2/nice) and [`SetThreadPriority`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority) for more details.
- `trackUnmanagedFds`: (`boolean`) An optional setting that, when `true`, will
cause Workers to track file descriptors managed using `fs.open()` and
`fs.close()`, and will close them automatically when the Worker exits.
Defaults to `true`. (This option is only supported on Node.js 12.19+ and
all Node.js versions higher than 14.6.0).
- `closeTimeout`: (`number`) An optional time (in milliseconds) to wait for the pool to
complete all in-flight tasks when `close()` is called. The default is `30000`
- `recordTiming`: (`boolean`) By default, run and wait time will be recorded
for the pool. To disable, set to `false`.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.

:::caution
Use caution when setting resource limits. Setting limits that are too low may
Expand Down Expand Up @@ -191,7 +190,7 @@ type PiscinaHistogramSummary = {
p99_9: number;
p99_99: number;
p99_999: number;
}
};
```

## `PiscinaLoadBalancer`
Expand Down Expand Up @@ -239,6 +238,7 @@ interface PiscinaWorker {
### Example: Custom Load Balancer

#### JavaScript

<a id="custom-load-balancer-example-js"> </a>

```js
Expand Down Expand Up @@ -279,6 +279,7 @@ piscina
```

#### TypeScript

<a id="custom-load-balancer-example-ts"> </a>

```ts
Expand Down
77 changes: 44 additions & 33 deletions docs/docs/api-reference/method.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@
id: Methods
sidebar_position: 3
---
## Method: `run(task[, options])`

## Method: `run<T>(task[, options]): T | Redeable`

Schedules a task to be run on a Worker thread.

* `task`: Any value. This will be passed to the function that is exported from
- `task`: Any value. This will be passed to the function that is exported from
`filename`.
* `options`:
* `transferList`: An optional lists of objects that is passed to
- `options`:
- `transferList`: An optional lists of objects that is passed to
[`postMessage()`] when posting `task` to the Worker, which are transferred
rather than cloned.
* `filename`: Optionally overrides the `filename` option passed to the
- `filename`: Optionally overrides the `filename` option passed to the
constructor for this task. If no `filename` was specified to the constructor,
this is mandatory.
* `name`: Optionally overrides the exported worker function used for the task.
* `abortSignal`: An `AbortSignal` instance. If passed, this can be used to
- `name`: Optionally overrides the exported worker function used for the task.
- `abortSignal`: An `AbortSignal` instance. If passed, this can be used to
cancel a task. If the task is already running, the corresponding `Worker`
thread will be stopped.
(More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
Expand All @@ -29,32 +30,42 @@ an error, the returned `Promise` will be rejected with that error.
If the task is aborted, the returned `Promise` is rejected with an error
as well.

## Method: `runTask(task[, transferList][, filename][, abortSignal])`
In case the worker return an `AsyncGenerator` or a `Generator`, the `run` method
will resolve with a [`Readable`](https://nodejs.org/api/stream.html#class-streamreadable) stream that will emit the values produced by the generator.

**Deprecated** -- Use `run(task, options)` instead.
:::caution
The return of a `Redeable` stream is experimental and is subject to further changes
:::

Schedules a task to be run on a Worker thread.
**Example**

* `task`: Any value. This will be passed to the function that is exported from
`filename`.
* `transferList`: An optional lists of objects that is passed to
[`postMessage()`] when posting `task` to the Worker, which are transferred
rather than cloned.
* `filename`: Optionally overrides the `filename` option passed to the
constructor for this task. If no `filename` was specified to the constructor,
this is mandatory.
* `signal`: An [`AbortSignal`][] instance. If passed, this can be used to
cancel a task. If the task is already running, the corresponding `Worker`
thread will be stopped.
(More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
events can be passed here.) Abortable tasks cannot share threads regardless
of the `concurrentTasksPerWorker` options.
`worker.js`

This returns a `Promise` for the return value of the (async) function call
made to the function exported from `filename`. If the (async) function throws
an error, the returned `Promise` will be rejected with that error.
If the task is aborted, the returned `Promise` is rejected with an error
as well.
```js
module.exports = function* (length) {
for (let i = 0; i < length; i++) {
yield `${i}`;
}
};
```

`main.js`

```js
const { resolve } = require('node:path');
const { Piscina } = require('piscina');

const pool = new Piscina({
filename: resolve(__dirname, 'worker.js'),
});

(async () => {
const stream = pool.run(10);
for await (const value of stream) {
console.log(value);
}
})();
```

## Method: `destroy()`

Expand All @@ -64,9 +75,9 @@ This returns a `Promise` that is fulfilled once all threads have stopped.

## Method: `close([options])`

* `options`:
* `force`: A `boolean` value that indicates whether to abort all tasks that
are enqueued but not started yet. The default is `false`.
- `options`:
- `force`: A `boolean` value that indicates whether to abort all tasks that
are enqueued but not started yet. The default is `false`.

It stops all Workers gracefully.

Expand All @@ -75,4 +86,4 @@ have completed and all threads have stopped.

This method is similar to `destroy()`, but with the difference that `close()`
will wait for the worker tasks to finish, while `destroy()`
will abort them immediately.
will abort them immediately.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
"require": "./dist/main.js"
},
"engines": {
"node": ">=18.x"
"node": ">=20.x"
},
"scripts": {
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
"build:clean": "rm -rf ./dist tsconfig.tsbuildinfo",
"lint": "eslint",
"test": "c8 tap",
"test:ci": "npm run lint && npm run build && npm run test:coverage",
Expand Down
61 changes: 42 additions & 19 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
PiscinaWorker,
LeastBusyBalancer
} from './worker_pool';
import { WorkerStream } from './worker_pool/worker_stream';
import {
AbortSignalAny,
AbortSignalEventTarget,
Expand Down Expand Up @@ -284,22 +285,28 @@ class ThreadPool {
// remove the `TaskInfo` associated with the Worker, which marks it as
// free again.
const taskInfo = workerInfo.taskInfos.get(taskId);
workerInfo.taskInfos.delete(taskId);

// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);

/* istanbul ignore if */
if (taskInfo === undefined) {
if (taskInfo == null) {
const err = new Error(
`Unexpected message from Worker: ${inspect(message)}`);
pool.publicInterface.emit('error', err);
workerInfo.taskInfos.delete(taskId);
} else {
taskInfo.done(message.error, result);
}
// Iterator -- yield
if (message.kind === 1) {
taskInfo.done(message.error, message.state === 0 ? null : result, message.error != null || message.state === 0)
} else {
workerInfo.taskInfos.delete(taskId);
taskInfo.done(message.error, result, true);

pool._processPendingMessages();
// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);

pool._processPendingMessages();
}
}
}

function onReady () {
Expand Down Expand Up @@ -369,7 +376,7 @@ class ThreadPool {
// If there are remaining unfinished tasks, call the callback that was
// passed to `postTask` with the error
for (const taskInfo of taskInfos) {
taskInfo.done(err, null);
taskInfo.done(err, null, true);
}
} else if (!onlyErrorUnfinishedTasks) {
// If there are no unfinished tasks, instead emit an 'error' event
Expand Down Expand Up @@ -514,15 +521,31 @@ class ThreadPool {
transferList,
filename,
name,
(err : Error | null, result : any) => {
(err, result, done) => {
if (done === false) {
if (taskInfo.redeable == null) {
taskInfo.redeable = new WorkerStream();
resolve(taskInfo.redeable);
}

taskInfo.redeable.push(result)
return;
} else if (done === true && taskInfo.redeable != null) {
if (err == null) taskInfo.redeable.push(null);
else taskInfo.redeable.destroy(err)
}

this.completed++;
if (taskInfo.started) {
this.histogram?.recordRunTime(performance.now() - taskInfo.started);
}
if (err !== null) {
reject(err);
} else {
resolve(result);

if (done === true) {
if (err !== null) {
reject(err);
} else {
resolve(result);
}
}

this._maybeDrain();
Expand Down Expand Up @@ -622,11 +645,11 @@ class ThreadPool {
this.destroying = true;
while (this.skipQueue.length > 0) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
taskInfo.done(new Error('Terminating worker thread'));
taskInfo.done(new Error('Terminating worker thread'), null, true);
}
while (this.taskQueue.size > 0) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
taskInfo.done(new Error('Terminating worker thread'));
taskInfo.done(new Error('Terminating worker thread'), null, true);
}

const exitEvents : Promise<any[]>[] = [];
Expand All @@ -651,7 +674,7 @@ class ThreadPool {
for (let i = 0; i < skipQueueLength; i++) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError('pool is closed'));
taskInfo.done(new AbortError('pool is closed'), null, true);
} else {
this.skipQueue.push(taskInfo);
}
Expand All @@ -661,7 +684,7 @@ class ThreadPool {
for (let i = 0; i < taskQueueLength; i++) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError('pool is closed'));
taskInfo.done(new AbortError('pool is closed'), null, true);
} else {
this.taskQueue.push(taskInfo);
}
Expand Down
Loading
Loading