Skip to content

Commit

Permalink
stream: add toArray
Browse files Browse the repository at this point in the history
Add the toArray method from the TC39 iterator helper proposal to
Readable streams. This also enables a common-use case of converting a
stream to an array.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
PR-URL: #41553
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
benjamingr and ronag committed Jan 20, 2022
1 parent de9dc41 commit 5a407d6
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
40 changes: 40 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,46 @@ await dnsResults.forEach((result) => {
console.log('done'); // Stream has finished
```

### `readable.toArray([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `options` {Object}
* `signal` {AbortSignal} allows cancelling the toArray operation if the
signal is aborted.
* Returns: {Promise} a promise containing an array (if the stream is in
object mode) or Buffer with the contents of the stream.

This method allows easily obtaining the contents of a stream. If the
stream is in [object mode][object-mode] an array of its contents is returned.
If the stream is not in object mode a Buffer containing its data is returned.

As this method reads the entire stream into memory, it negates the benefits of
streams. It's intended for interoperability and convenience, not as the primary
way to consume streams.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const { Buffer } = require('buffer');

const {
codes: {
ERR_INVALID_ARG_TYPE,
Expand All @@ -10,6 +12,7 @@ const {
const { validateInteger } = require('internal/validators');

const {
ArrayPrototypePush,
MathFloor,
Promise,
PromiseReject,
Expand Down Expand Up @@ -174,11 +177,25 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
if (!this.readableObjectMode) {
return Buffer.concat(result);
}
return result;
}
module.exports.streamReturningOperators = {
filter,
map,
};

module.exports.promiseReturningOperators = {
forEach,
toArray,
};
79 changes: 79 additions & 0 deletions test/parallel/test-stream-toArray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

{
// Works on a synchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test);
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Works on a non-object-mode stream and flattens it
(async () => {
const stream = Readable.from(
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
, { objectMode: false });
const result = await stream.toArray();
assert.strictEqual(Buffer.isBuffer(result), true);
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
})().then(common.mustCall());
}

{
// Works on an asynchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test).map((x) => Promise.resolve(x));
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
let stream;
assert.rejects(async () => {
stream = Readable.from([1, 2, 3]).map(async (x) => {
if (x === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve(x);
});
await stream.toArray({ signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
// Only stops toArray, does not destory the stream
assert(stream.destroyed, false);
}));
ac.abort();
}
{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}

2 comments on commit 5a407d6

@liel-almog
Copy link

@liel-almog liel-almog commented on 5a407d6 Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ื”ื™ื™ ืื—ื™ ืื•ื”ื‘ ืืช ื”ืขื‘ื•ื“ื” ืฉืœืš, ื™ืฉ ืžืฆื‘ ืืชื” ืขื•ื–ืจ ืœื™ ืœื”ืชื—ื™ืœ ืœืคืชื•ืจ ื“ื‘ืจื™ื ื›ืืœื” ืื ื™ ืžืžืฉ ืจื•ืฆื” ืœื”ื™ื›ื ืก ืœื–ื”.
๐Ÿ˜Ž
ืื—ืจื™ ืฉืชืขื ื” ืœื™ ืืžื—ื•ืง ืืช ื”ื”ืขืจื”

@benjamingr
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liel-almog
ื”ื™ื™, ื”ืื™ืžื™ื™ืœ ืฉืœื™ ื‘ืจื™ื“ืžื™ ืฉืœ ื ื•ื“ ืืชื” ืžื•ื–ืžืŸ ืœืฉืœื•ื— ื”ื•ื“ืขื” ื•ื‘ื›ื™ืฃ

Please sign in to comment.