From 1ae648567afd2e242249cc0c94f28347d30ae74b Mon Sep 17 00:00:00 2001 From: linkgoron Date: Mon, 7 Feb 2022 09:24:17 +0200 Subject: [PATCH] stream: add iterator helper find Continue iterator-helpers work by adding `find` to readable streams. PR-URL: https://github.com/nodejs/node/pull/41849 Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- doc/api/stream.md | 89 +++++++-- lib/internal/streams/operators.js | 42 ++--- test/parallel/test-stream-filter.js | 8 + test/parallel/test-stream-flatMap.js | 8 + test/parallel/test-stream-forEach.js | 8 + test/parallel/test-stream-some-every.js | 106 ----------- test/parallel/test-stream-some-find-every.mjs | 174 ++++++++++++++++++ 7 files changed, 282 insertions(+), 153 deletions(-) delete mode 100644 test/parallel/test-stream-some-every.js create mode 100644 test/parallel/test-stream-some-find-every.mjs diff --git a/doc/api/stream.md b/doc/api/stream.md index efc263ccffedc8..833fb08c48ca72 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1691,7 +1691,8 @@ added: v17.4.0 > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to map over every item in the stream. +* `fn` {Function|AsyncFunction} a function to map over every chunk in the + stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1704,7 +1705,7 @@ added: v17.4.0 * Returns: {Readable} a stream mapped with the function `fn`. This method allows mapping over the stream. The `fn` function will be called -for every item in the stream. If the `fn` function returns a promise - that +for every chunk in the stream. If the `fn` function returns a promise - that promise will be `await`ed before being passed to the result stream. ```mjs @@ -1712,8 +1713,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous mapper. -for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { - console.log(item); // 2, 4, 6, 8 +for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { + console.log(chunk); // 2, 4, 6, 8 } // With an asynchronous mapper, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1735,7 +1736,7 @@ added: v17.4.0 > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to filter items from stream. +* `fn` {Function|AsyncFunction} a function to filter chunks from the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1747,8 +1748,8 @@ added: v17.4.0 aborted. * Returns: {Readable} a stream filtered with the predicate `fn`. -This method allows filtering the stream. For each item in the stream the `fn` -function will be called and if it returns a truthy value, the item will be +This method allows filtering the stream. For each chunk in the stream the `fn` +function will be called and if it returns a truthy value, the chunk will be passed to the result stream. If the `fn` function returns a promise - that promise will be `await`ed. @@ -1757,8 +1758,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous predicate. -for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { - console.log(item); // 3, 4 +for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(chunk); // 3, 4 } // With an asynchronous predicate, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1784,7 +1785,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1796,12 +1797,12 @@ added: REPLACEME aborted. * Returns: {Promise} a promise for when the stream has finished. -This method allows iterating a stream. For each item in the stream the +This method allows iterating a stream. For each chunk in the stream the `fn` function will be called. If the `fn` function returns a promise - that promise will be `await`ed. This method is different from `for await...of` loops in that it can optionally -process items concurrently. In addition, a `forEach` iteration can only be +process chunks concurrently. In addition, a `forEach` iteration can only be stopped by having passed a `signal` option and aborting the related `AbortController` while `for await...of` can be stopped with `break` or `return`. In either case the stream will be destroyed. @@ -1815,8 +1816,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous predicate. -for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { - console.log(item); // 3, 4 +for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(chunk); // 3, 4 } // With an asynchronous predicate, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1881,7 +1882,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1922,6 +1923,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB console.log('done'); // Stream has finished ``` +### `readable.find(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to the first chunk for which `fn` + evaluated with a truthy value, or `undefined` if no element was found. + +This method is similar to `Array.prototype.find` and calls `fn` on each chunk +in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's +awaited return value is truthy, the stream is destroyed and the promise is +fulfilled with value for which `fn` returned a truthy value. If all of the +`fn` calls on the chunks return a falsy value, the promise is fulfilled with +`undefined`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3 +await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1 +await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined + +// With an asynchronous predicate, making at most 2 file checks at a time. +const foundBigFile = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).find(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB +console.log('done'); // Stream has finished +``` + ### `readable.every(fn[, options])`