diff --git a/doc/api/stream.md b/doc/api/stream.md index 83b7391605f1c1..2c8714157e65e4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2008,6 +2008,10 @@ showBoth(); added: - v17.4.0 - v16.14.0 +changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/49249 + description: added `highWaterMark` in options. --> > Stability: 1 - Experimental @@ -2021,6 +2025,8 @@ added: * `options` {Object} * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. + * `highWaterMark` {number} how many items to buffer while waiting for user + consumption of the mapped items. **Default:** `concurrency * 2 - 1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. * Returns: {Readable} a stream mapped with the function `fn`. @@ -2055,6 +2061,10 @@ for await (const result of dnsResults) { added: - v17.4.0 - v16.14.0 +changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/49249 + description: added `highWaterMark` in options. --> > Stability: 1 - Experimental @@ -2067,6 +2077,8 @@ added: * `options` {Object} * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. + * `highWaterMark` {number} how many items to buffer while waiting for user + consumption of the filtered items. **Default:** `concurrency * 2 - 1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. * Returns: {Readable} a stream filtered with the predicate `fn`. diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 47208136e0916d..b8dde2a5b9ee8c 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -33,6 +33,7 @@ const { NumberIsNaN, Promise, PromiseReject, + PromiseResolve, PromisePrototypeThen, Symbol, } = primordials; @@ -82,7 +83,15 @@ function map(fn, options) { concurrency = MathFloor(options.concurrency); } - validateInteger(concurrency, 'concurrency', 1); + let highWaterMark = concurrency - 1; + if (options?.highWaterMark != null) { + highWaterMark = MathFloor(options.highWaterMark); + } + + validateInteger(concurrency, 'options.concurrency', 1); + validateInteger(highWaterMark, 'options.highWaterMark', 0); + + highWaterMark += concurrency; return async function* map() { const signal = AbortSignal.any([options?.signal].filter(Boolean)); @@ -93,9 +102,28 @@ function map(fn, options) { let next; let resume; let done = false; + let cnt = 0; - function onDone() { + function onCatch() { done = true; + afterItemProcessed(); + } + + function afterItemProcessed() { + cnt -= 1; + maybeResume(); + } + + function maybeResume() { + if ( + resume && + !done && + cnt < concurrency && + queue.length < highWaterMark + ) { + resume(); + resume = null; + } } async function pump() { @@ -111,17 +139,19 @@ function map(fn, options) { try { val = fn(val, signalOpt); + + if (val === kEmpty) { + continue; + } + + val = PromiseResolve(val); } catch (err) { val = PromiseReject(err); } - if (val === kEmpty) { - continue; - } + cnt += 1; - if (typeof val?.catch === 'function') { - val.catch(onDone); - } + PromisePrototypeThen(val, afterItemProcessed, onCatch); queue.push(val); if (next) { @@ -129,7 +159,7 @@ function map(fn, options) { next = null; } - if (!done && queue.length && queue.length >= concurrency) { + if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) { await new Promise((resolve) => { resume = resolve; }); @@ -138,7 +168,7 @@ function map(fn, options) { queue.push(kEof); } catch (err) { const val = PromiseReject(err); - PromisePrototypeThen(val, undefined, onDone); + PromisePrototypeThen(val, afterItemProcessed, onCatch); queue.push(val); } finally { done = true; @@ -169,10 +199,7 @@ function map(fn, options) { } queue.shift(); - if (resume) { - resume(); - resume = null; - } + maybeResume(); } await new Promise((resolve) => { diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 7a21e299534742..627ea0ccf1be60 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -96,7 +96,7 @@ const { once } = require('events'); Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { calls++; await once(signal, 'abort'); - }, { signal: ac.signal, concurrency: 2 }); + }, { signal: ac.signal, concurrency: 2, highWaterMark: 0 }); // pump assert.rejects(async () => { await forEachPromise; diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index ba0571fe3a7b95..4a7a53c55960ea 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -8,6 +8,25 @@ const assert = require('assert'); const { once } = require('events'); const { setTimeout } = require('timers/promises'); +function createDependentPromises(n) { + const promiseAndResolveArray = []; + + for (let i = 0; i < n; i++) { + let res; + const promise = new Promise((resolve) => { + if (i === 0) { + res = resolve; + return; + } + res = () => promiseAndResolveArray[i - 1][0].then(resolve); + }); + + promiseAndResolveArray.push([promise, res]); + } + + return promiseAndResolveArray; +} + { // Map works on synchronous streams with a synchronous mapper const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); @@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises'); const stream = range.map(common.mustCall(async (_, { signal }) => { await once(signal, 'abort'); throw signal.reason; - }, 2), { signal: ac.signal, concurrency: 2 }); + }, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 }); // pump assert.rejects(async () => { for await (const item of stream) { @@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } + +{ + // highWaterMark with small concurrency + const finishOrder = []; + + const promises = createDependentPromises(4); + + const raw = Readable.from([2, 0, 1, 3]); + const stream = raw.map(async (item) => { + const [promise, resolve] = promises[item]; + resolve(); + + await promise; + finishOrder.push(item); + return item; + }, { concurrency: 2 }); + + (async () => { + await stream.toArray(); + + assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]); + })().then(common.mustCall(), common.mustNotCall()); +} + +{ + // highWaterMark with a lot of items and large concurrency + const finishOrder = []; + + const promises = createDependentPromises(20); + + const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19]; + const raw = Readable.from(input); + // Should be + // 10, 1, 0, 3, 4, 2 | next: 0 + // 10, 1, 3, 4, 2, 5 | next: 1 + // 10, 3, 4, 2, 5, 7 | next: 2 + // 10, 3, 4, 5, 7, 8 | next: 3 + // 10, 4, 5, 7, 8, 9 | next: 4 + // 10, 5, 7, 8, 9, 6 | next: 5 + // 10, 7, 8, 9, 6, 11 | next: 6 + // 10, 7, 8, 9, 11, 12 | next: 7 + // 10, 8, 9, 11, 12, 13 | next: 8 + // 10, 9, 11, 12, 13, 18 | next: 9 + // 10, 11, 12, 13, 18, 15 | next: 10 + // 11, 12, 13, 18, 15, 16 | next: 11 + // 12, 13, 18, 15, 16, 17 | next: 12 + // 13, 18, 15, 16, 17, 14 | next: 13 + // 18, 15, 16, 17, 14, 19 | next: 14 + // 18, 15, 16, 17, 19 | next: 15 + // 18, 16, 17, 19 | next: 16 + // 18, 17, 19 | next: 17 + // 18, 19 | next: 18 + // 19 | next: 19 + // + + const stream = raw.map(async (item) => { + const [promise, resolve] = promises[item]; + resolve(); + + await promise; + finishOrder.push(item); + return item; + }, { concurrency: 6 }); + + (async () => { + const outputOrder = await stream.toArray(); + + assert.deepStrictEqual(outputOrder, input); + assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); + })().then(common.mustCall(), common.mustNotCall()); +} + +{ + // Custom highWaterMark with a lot of items and large concurrency + const finishOrder = []; + + const promises = createDependentPromises(20); + + const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19]; + const raw = Readable.from(input); + // Should be + // 11, 1, 0, 3, 4 | next: 0, buffer: [] + // 11, 1, 3, 4, 2 | next: 1, buffer: [0] + // 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1] + // 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2] + // 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3] + // 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4] + // 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5] + // 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full + // 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6] + // 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6] + // 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6] + // 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6] + // 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it + // 13, 18, 15, 16, 17 | next: 13, buffer: [] + // 18, 15, 16, 17, 14 | next: 14, buffer: [] + // 18, 15, 16, 17, 19 | next: 15, buffer: [14] + // 18, 16, 17, 19 | next: 16, buffer: [14, 15] + // 18, 17, 19 | next: 17, buffer: [14, 15, 16] + // 18, 19 | next: 18, buffer: [14, 15, 16, 17] + // 19 | next: 19, buffer: [] -- all items flushed + // + + const stream = raw.map(async (item) => { + const [promise, resolve] = promises[item]; + resolve(); + + await promise; + finishOrder.push(item); + return item; + }, { concurrency: 5, highWaterMark: 7 }); + + (async () => { + const outputOrder = await stream.toArray(); + + assert.deepStrictEqual(outputOrder, input); + assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); + })().then(common.mustCall(), common.mustNotCall()); +} + +{ + // Where there is a delay between the first and the next item it should not wait for filled queue + // before yielding to the user + const promises = createDependentPromises(3); + + const raw = Readable.from([0, 1, 2]); + + const stream = raw + .map(async (item) => { + if (item !== 0) { + await promises[item][0]; + } + + return item; + }, { concurrency: 2 }) + .map((item) => { + // eslint-disable-next-line no-unused-vars + for (const [_, resolve] of promises) { + resolve(); + } + + return item; + }); + + (async () => { + await stream.toArray(); + })().then(common.mustCall(), common.mustNotCall()); +} + { // Error cases assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/); assert.throws(() => Readable.from([1]).map((x) => x, { concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).map((x) => x, { + concurrency: -1 + }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); }