diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index 2d5c5894e1eb0f..c3a994ae39bc96 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -5,16 +5,14 @@ const { Readable, } = require('stream'); const assert = require('assert'); +const { once } = require('events'); const { setTimeout } = require('timers/promises'); { // Map works on synchronous streams with a synchronous mapper const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); - const result = [2, 4, 6, 8, 10]; (async () => { - for await (const item of stream) { - assert.strictEqual(item, result.shift()); - } + assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); })().then(common.mustCall()); } @@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises'); await Promise.resolve(); return x + x; }); - const result = [2, 4, 6, 8, 10]; + (async () => { + assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + return x + x; + }).map((x) => x + x); + (async () => { + assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]); + })().then(common.mustCall()); +} + +{ + // Map works on an infinite stream + const stream = Readable.from(async function* () { + while (true) yield 1; + }()).map(common.mustCall(async (x) => { + return x + x; + }, 5)); + (async () => { + let i = 1; + for await (const item of stream) { + assert.strictEqual(item, 2); + if (++i === 5) break; + } + })().then(common.mustCall()); +} + +{ + // Map works on non-objectMode streams + const stream = new Readable({ + read() { + this.push(Uint8Array.from([1])); + this.push(Uint8Array.from([2])); + this.push(null); + } + }).map(async ([x]) => { + return x + x; + }).map((x) => x + x); + const result = [4, 8]; (async () => { for await (const item of stream) { assert.strictEqual(item, result.shift()); @@ -33,11 +73,19 @@ const { setTimeout } = require('timers/promises'); } { - // Map works on asynchronous streams with a asynchronous mapper - const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + // Does not care about data events + const source = new Readable({ + read() { + this.push(Uint8Array.from([1])); + this.push(Uint8Array.from([2])); + this.push(null); + } + }); + setImmediate(() => stream.emit('data', Uint8Array.from([1]))); + const stream = source.map(async ([x]) => { return x + x; }).map((x) => x + x); - const result = [4, 8, 12, 16, 20]; + const result = [4, 8]; (async () => { for await (const item of stream) { assert.strictEqual(item, result.shift()); @@ -45,19 +93,61 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } +{ + // Emitting an error during `map` + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + if (x === 3) { + stream.emit('error', new Error('boom')); + } + return x + x; + }); + assert.rejects( + stream.map((x) => x + x).toArray(), + /boom/, + ).then(common.mustCall()); +} + +{ + // Throwing an error during `map` (sync) + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => { + if (x === 3) { + throw new Error('boom'); + } + return x + x; + }); + assert.rejects( + stream.map((x) => x + x).toArray(), + /boom/, + ).then(common.mustCall()); +} + + +{ + // Throwing an error during `map` (async) + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + if (x === 3) { + throw new Error('boom'); + } + return x + x; + }); + assert.rejects( + stream.map((x) => x + x).toArray(), + /boom/, + ).then(common.mustCall()); +} + { // Concurrency + AbortSignal const ac = new AbortController(); - let calls = 0; - const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { - calls++; - await setTimeout(100, { signal }); - }, { signal: ac.signal, concurrency: 2 }); + const range = Readable.from([1, 2, 3, 4, 5]); + const stream = range.map(common.mustCall(async (_, { signal }) => { + await once(signal, 'abort'); + throw signal.reason; + }, 2), { signal: ac.signal, concurrency: 2 }); // pump assert.rejects(async () => { for await (const item of stream) { - // nope - console.log(item); + assert.fail('should not reach here, got ' + item); } }, { name: 'AbortError', @@ -65,7 +155,6 @@ const { setTimeout } = require('timers/promises'); setImmediate(() => { ac.abort(); - assert.strictEqual(calls, 2); }); }