diff --git a/doc/api/stream.md b/doc/api/stream.md index bbbc28acf7f5da..e8b8b0c5ef6a4e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1694,6 +1694,50 @@ async function showBoth() { showBoth(); ``` +### `readable.map(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to map over every item in the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximal concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream mapped with the function `fn`. + +This method allows mapping over the stream. The `fn` function will be called +for every item in the stream. If the `fn` function returns a promise - that +promise will be `await`ed before being passed to the result stream. + +```mjs +import { Readable } from 'stream'; +import { Resolver } from 'dns/promises'; + +// With a synchronous mapper. +for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { + console.log(item); // 2, 4, 6, 8 +} +// With an asynchronous mapper, making at most 2 queries at a time. +const resolver = new Resolver(); +const dnsResults = await Readable.from([ + 'nodejs.org', + 'openjsf.org', + 'www.linuxfoundation.org', +]).map((domain) => resolver.resolve4(domain), { concurrency: 2 }); +for await (const result of dnsResults) { + console.log(result); // Logs the DNS result of resolver.resolve4. +} +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js new file mode 100644 index 00000000000000..7ffbbebd332d9e --- /dev/null +++ b/lib/internal/streams/operators.js @@ -0,0 +1,152 @@ +'use strict'; + +const { AbortController } = require('internal/abort_controller'); +const { + codes: { + ERR_INVALID_ARG_TYPE, + }, + AbortError, +} = require('internal/errors'); +const { validateInteger } = require('internal/validators'); + +const { + MathFloor, + Promise, + PromiseReject, + PromisePrototypeCatch, + Symbol, +} = primordials; + +const kEmpty = Symbol('kEmpty'); +const kEof = Symbol('kEof'); + +async function * map(fn, options) { + if (typeof fn !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], this); + } + + if (options != null && typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + } + + let concurrency = 1; + if (options?.concurrency != null) { + concurrency = MathFloor(options.concurrency); + } + + validateInteger(concurrency, 'concurrency', 1); + + const ac = new AbortController(); + const stream = this; + const queue = []; + const signal = ac.signal; + const signalOpt = { signal }; + + const abort = () => ac.abort(); + options?.signal?.addEventListener('abort', abort); + + let next; + let resume; + let done = false; + + function onDone() { + done = true; + } + + async function pump() { + try { + for await (let val of stream) { + if (done) { + return; + } + + if (signal.aborted) { + throw new AbortError(); + } + + try { + val = fn(val, signalOpt); + } catch (err) { + val = PromiseReject(err); + } + + if (val === kEmpty) { + continue; + } + + if (typeof val?.catch === 'function') { + val.catch(onDone); + } + + queue.push(val); + if (next) { + next(); + next = null; + } + + if (!done && queue.length && queue.length >= concurrency) { + await new Promise((resolve) => { + resume = resolve; + }); + } + } + queue.push(kEof); + } catch (err) { + const val = PromiseReject(err); + PromisePrototypeCatch(val, onDone); + queue.push(val); + } finally { + done = true; + if (next) { + next(); + next = null; + } + options?.signal?.removeEventListener('abort', abort); + } + } + + pump(); + + try { + while (true) { + while (queue.length > 0) { + const val = await queue[0]; + + if (val === kEof) { + return; + } + + if (signal.aborted) { + throw new AbortError(); + } + + if (val !== kEmpty) { + yield val; + } + + queue.shift(); + if (resume) { + resume(); + resume = null; + } + } + + await new Promise((resolve) => { + next = resolve; + }); + } + } finally { + ac.abort(); + + done = true; + if (resume) { + resume(); + resume = null; + } + } +} + +module.exports = { + map, +}; diff --git a/lib/stream.js b/lib/stream.js index c7f61cf8873786..0c94011a532bd1 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -23,12 +23,15 @@ const { ObjectDefineProperty, + ObjectKeys, + ReflectApply, } = primordials; const { promisify: { custom: customPromisify }, } = require('internal/util'); +const operators = require('internal/streams/operators'); const compose = require('internal/streams/compose'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); @@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.isDisturbed = utils.isDisturbed; Stream.isErrored = utils.isErrored; Stream.Readable = require('internal/streams/readable'); +for (const key of ObjectKeys(operators)) { + const op = operators[key]; + Stream.Readable.prototype[key] = function(...args) { + return Stream.Readable.from(ReflectApply(op, this, args)); + }; +} Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index e54884ba46004f..feb1f5ce298acf 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -109,6 +109,7 @@ const expectedModules = new Set([ 'NativeModule internal/streams/end-of-stream', 'NativeModule internal/streams/from', 'NativeModule internal/streams/legacy', + 'NativeModule internal/streams/operators', 'NativeModule internal/streams/passthrough', 'NativeModule internal/streams/pipeline', 'NativeModule internal/streams/readable', diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js new file mode 100644 index 00000000000000..2d5c5894e1eb0f --- /dev/null +++ b/test/parallel/test-stream-map.js @@ -0,0 +1,108 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // Map works on synchronous streams with a synchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on synchronous streams with an asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + await Promise.resolve(); + return x + x; + }); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + return x + x; + }).map((x) => x + x); + const result = [4, 8, 12, 16, 20]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + let calls = 0; + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); + // pump + assert.rejects(async () => { + for await (const item of stream) { + // nope + console.log(item); + } + }, { + name: 'AbortError', + }).then(common.mustCall()); + + setImmediate(() => { + ac.abort(); + assert.strictEqual(calls, 2); + }); +} + +{ + // Concurrency result order + const stream = Readable.from([1, 2]).map(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + return item; + }, { concurrency: 2 }); + + (async () => { + const expected = [1, 2]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + +{ + // Error cases + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).map(1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, { + concurrency: 'Foo' + })); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, 1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} +{ + // Test result is a Readable + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); + assert.strictEqual(stream.readable, true); +} diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index 562a3323bbb195..296eec53c201e2 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -203,12 +203,12 @@ const customTypesMap = { 'Stream': 'stream.html#stream', 'stream.Duplex': 'stream.html#class-streamduplex', - 'stream.Readable': 'stream.html#class-streamreadable', - 'stream.Transform': 'stream.html#class-streamtransform', - 'stream.Writable': 'stream.html#class-streamwritable', 'Duplex': 'stream.html#class-streamduplex', + 'stream.Readable': 'stream.html#class-streamreadable', 'Readable': 'stream.html#class-streamreadable', + 'stream.Transform': 'stream.html#class-streamtransform', 'Transform': 'stream.html#class-streamtransform', + 'stream.Writable': 'stream.html#class-streamwritable', 'Writable': 'stream.html#class-streamwritable', 'Immediate': 'timers.html#class-immediate',