Skip to content

Commit

Permalink
stream: add forEach method
Browse files Browse the repository at this point in the history
Add a `forEach` method to readable streams to enable concurrent
iteration and align with the iterator-helpers proposal.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs#41445
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
2 people authored and danielleadams committed Apr 21, 2022
1 parent 647805a commit dcc5831
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 10 deletions.
63 changes: 61 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ added: v16.14.0
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1740,7 +1740,7 @@ added: v16.14.0
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
Expand Down Expand Up @@ -1775,6 +1775,65 @@ for await (const result of dnsResults) {
}
```

### `readable.forEach(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise for when the stream has finished.

This method allows iterating a stream. For each item in the stream the
`fn` function will be called. If the `fn` function returns a promise - that
promise will be `await`ed.

This method is different from `for await...of` loops in that it can optionally
process items concurrently. In addition, a `forEach` iteration can only be
stopped by having passed a `signal` option and aborting the related
`AbortController` while `for await...of` can be stopped with `break` or
`return`. In either case the stream will be destroyed.

This method is different from listening to the [`'data'`][] event in that it
uses the [`readable`][] event in the underlying machinary and can limit the
number of concurrent `fn` calls.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
28 changes: 23 additions & 5 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
async function * map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
'fn', ['Function', 'AsyncFunction'], fn);
}

if (options != null && typeof options !== 'object') {
Expand Down Expand Up @@ -147,10 +147,23 @@ async function * map(fn, options) {
}
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
async function forEachFn(value, options) {
await fn(value, options);
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of this.map(forEachFn, options));
}

async function * filter(fn, options) {
if (typeof fn !== 'function') {
throw (new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this));
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
async function filterFn(value, options) {
if (await fn(value, options)) {
Expand All @@ -160,7 +173,12 @@ async function * filter(fn, options) {
}
yield* this.map(filterFn, options);
}
module.exports = {

module.exports.streamReturningOperators = {
filter,
map,
filter
};

module.exports.promiseReturningOperators = {
forEach,
};
15 changes: 12 additions & 3 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const operators = require('internal/streams/operators');
const {
streamReturningOperators,
promiseReturningOperators,
} = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.isReadable = utils.isReadable;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
const op = operators[key];
for (const key of ObjectKeys(streamReturningOperators)) {
const op = streamReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
for (const key of ObjectKeys(promiseReturningOperators)) {
const op = promiseReturningOperators[key];
Stream.Readable.prototype[key] = function(...args) {
return ReflectApply(op, this, args);
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
86 changes: 86 additions & 0 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// forEach works on synchronous streams with a synchronous predicate
const stream = Readable.from([1, 2, 3]);
const result = [1, 2, 3];
(async () => {
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
})().then(common.mustCall());
}

{
// forEach works an asynchronous streams
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
await Promise.resolve();
return true;
});
const result = [1, 2, 3];
(async () => {
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
})().then(common.mustCall());
}

{
// forEach works on asynchronous streams with a asynchronous forEach fn
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
await Promise.resolve();
return true;
});
const result = [1, 2, 3];
(async () => {
await stream.forEach(async (value) => {
await Promise.resolve();
assert.strictEqual(value, result.shift());
});
})().then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const forEachPromise =
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
await forEachPromise;
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).forEach(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).forEach((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Promise
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
assert.strictEqual(typeof stream.then, 'function');
}

0 comments on commit dcc5831

Please sign in to comment.