Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add toArray helpers #41553

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,46 @@ await dnsResults.forEach((result) => {
console.log('done'); // Stream has finished
```

### `readable.toArray([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `options` {Object}
* `signal` {AbortSignal} allows cancelling the toArray operation if the
signal is aborted.
* Returns: {Promise} a promise containing an array (if the stream is in
object mode) or Buffer with the contents of the stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if this used a Uint8Array instead of Buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell .read() returns a buffer so I assumed this should return the same type. As buffers are all Uint8Arrays and this behavior is stream specific I'm not sure what we gain?

Why do you prefer Uint8Array here? Is there some long-term plan to minimize/deprecate Buffer as a type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you prefer Uint8Array here? Is there some long-term plan to minimize/deprecate Buffer as a type?

Consider it a non-blocking personal preference to start moving away, as much as possible, from Node.js specific APIs where there is a language and web platform standard API option available. We can't change the existing stream.Readable API but we don't have to introduce new dependencies on the implementation specific APIs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronag @mcollina thoughts?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider it a non-blocking personal preference to start moving away, as much as possible, from Node.js specific APIs where there is a language and web platform standard API option available.

I strongly agree with this too, as a heavy user of Node.js. Buffer makes it harder to create cross-platform packages.

I think this merits a larger discussion in a dedicated issue though. I'd like to see an official stance on this, for Buffer specifically, but also generally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sindresorhus @jasnell opened #41588 - since this API is experimental I am content with shipping it with buffer and breaking it in a small way in the next version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the spec say/do here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule of least surprise would tell me to return a Buffer here. I don't think we should return a UInt8Array unless it's specified by the spec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronag If we were to toArray an async iterator it would return a list of all the chunks I would assume it would not concat them. I can change the behavior to an array of buffers but I am not sure it's particularly useful to users.

I suspect/hope that web streams would overload .toArray to do what users would expect (return the stream as an array) rather than a list of chunks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is probably changing and we are going with no non-objectMode specific behavior #41615 - it's also good to Note toArray will always return well... an array :)


This method allows easily obtaining the contents of a stream. If the
stream is in [object mode][object-mode] an array of its contents is returned.
If the stream is not in object mode a Buffer containing its data is returned.

As this method reads the entire stream into memory, it negates the benefits of
streams. It's intended for interoperability and convenience, not as the primary
way to consume streams.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const { Buffer } = require('buffer');

const {
codes: {
ERR_INVALID_ARG_TYPE,
Expand All @@ -10,6 +12,7 @@ const {
const { validateInteger } = require('internal/validators');

const {
ArrayPrototypePush,
MathFloor,
Promise,
PromiseReject,
Expand Down Expand Up @@ -174,11 +177,25 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

async function toArray(options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async function toArray(options) {
/**
* @param {{ signal?: AbortSignal }} [options]
* @returns {Array<unknown> | Buffer}
*/
async function toArray(options = undefined) {

Copy link
Contributor

@Mesteery Mesteery Jan 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async function toArray(options) {
/**
* @param {{ signal?: AbortSignal }} [options]
* @returns {Buffer | any[]}
*/
async function toArray(options = undefined) {

For simple types, ...[] is more common than Array<...> in the codebase (same for any).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have JSDoc comments for any of the other functions in this file, should we maybe do a PR adding it to all of them at once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with JSDoc not being included. We probably want toArray.length to be 0, to align with the proposal.

Suggested change
async function toArray(options) {
async function toArray(options = undefined) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aduh95 the .length is different in all the other methods (that take a signal), we've opened an issue to ensure forward-compatibility to ensure that.

Given the primary proposal contributors from what I can tell @devsnek and @ExE-Boss are node contributors I will happily accept guidance with regards to how strict we should be about it :)

const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
if (!this.readableObjectMode) {
return Buffer.concat(result);
}
return result;
}
module.exports.streamReturningOperators = {
filter,
map,
};

module.exports.promiseReturningOperators = {
forEach,
toArray,
};
79 changes: 79 additions & 0 deletions test/parallel/test-stream-toArray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

{
// Works on a synchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can avoid creating intermediary arrays (and imho it's easier to read, but feel free to disagree)

Suggested change
Array(100).fill().map((_, i) => i),
Array.from({ length: 100 }, (_, i) => i),

];
for (const test of tests) {
const stream = Readable.from(test);
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Works on a non-object-mode stream and flattens it
(async () => {
const stream = Readable.from(
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
, { objectMode: false });
const result = await stream.toArray();
assert.strictEqual(Buffer.isBuffer(result), true);
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
})().then(common.mustCall());
}

{
// Works on an asynchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test).map((x) => Promise.resolve(x));
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
let stream;
assert.rejects(async () => {
stream = Readable.from([1, 2, 3]).map(async (x) => {
if (x === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve(x);
});
await stream.toArray({ signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
// Only stops toArray, does not destory the stream
assert(stream.destroyed, false);
}));
ac.abort();
}
{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}