From ddb5152e9b8a914fd62b85e19455d5885fe30faf Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Sun, 12 May 2019 19:00:53 +0200 Subject: [PATCH] stream: implement Readable.from async iterator utility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/27660 Reviewed-By: Gus Caplan Reviewed-By: James M Snell Reviewed-By: Michaƫl Zasso Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Anna Henningsen --- doc/api/stream.md | 113 ++++++++++++++++++- lib/_stream_readable.js | 39 +++++++ test/parallel/test-events-once.js | 2 +- test/parallel/test-readable-from.js | 163 ++++++++++++++++++++++++++++ 4 files changed, 314 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-readable-from.js diff --git a/doc/api/stream.md b/doc/api/stream.md index dc6a79db693e96..24f3ee8900f9e8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js: * [`Transform`][] - `Duplex` streams that can modify or transform the data as it is written and read (for example, [`zlib.createDeflate()`][]). -Additionally, this module includes the utility functions [pipeline][] and -[finished][]. +Additionally, this module includes the utility functions [pipeline][], +[finished][] and [Readable.from][]. ### Object Mode @@ -1445,6 +1445,31 @@ async function run() { run().catch(console.error); ``` +### Readable.from(iterable, [options]) + +* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or + `Symbol.iterator` iterable protocol. +* `options` {Object} Options provided to `new stream.Readable([options])`. + By default, `Readable.from()` will set `options.objectMode` to `true`, unless + this is explicitly opted out by setting `options.objectMode` to `false`. + +A utility method for creating Readable Streams out of iterators. + +```js +const { Readable } = require('stream'); + +async function * generate() { + yield 'hello'; + yield 'streams'; +} + +const readable = Readable.from(generate()); + +readable.on('data', (chunk) => { + console.log(chunk); +}); +``` + ## API for Stream Implementers @@ -2368,6 +2393,89 @@ primarily for examples and testing, but there are some use cases where +### Streams Compatibility with Async Generators and Async Iterators + +With the support of async generators and iterators in JavaScript, async +generators are effectively a first-class language-level stream construct at +this point. + +Some common interop cases of using Node.js streams with async generators +and async iterators are provided below. + +#### Consuming Readable Streams with Async Iterators + +```js +(async function() { + for await (const chunk of readable) { + console.log(chunk); + } +})(); +``` + +#### Creating Readable Streams with Async Generators + +We can construct a Node.js Readable Stream from an asynchronous generator +using the `Readable.from` utility method: + +```js +const { Readable } = require('stream'); + +async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; +} + +const readable = Readable.from(generate()); + +readable.on('data', (chunk) => { + console.log(chunk); +}); +``` + +#### Piping to Writable Streams from Async Iterators + +In the scenario of writing to a writeable stream from an async iterator, +it is important to ensure the correct handling of backpressure and errors. + +```js +const { once } = require('events'); + +const writeable = fs.createWriteStream('./file'); + +(async function() { + for await (const chunk of iterator) { + // Handle backpressure on write + if (!writeable.write(value)) + await once(writeable, 'drain'); + } + writeable.end(); + // Ensure completion without errors + await once(writeable, 'finish'); +})(); +``` + +In the above, errors on the write stream would be caught and thrown by the two +`once` listeners, since `once` will also handle `'error'` events. + +Alternatively the readable stream could be wrapped with `Readable.from` and +then piped via `.pipe`: + +```js +const { once } = require('events'); + +const writeable = fs.createWriteStream('./file'); + +(async function() { + const readable = Readable.from(iterator); + readable.pipe(writeable); + // Ensure completion without errors + await once(writeable, 'finish'); +})(); +``` + + + ### Compatibility with Older Node.js Versions @@ -2504,6 +2612,7 @@ contain multi-byte characters. [Compatibility]: #stream_compatibility_with_older_node_js_versions [HTTP requests, on the client]: http.html#http_class_http_clientrequest [HTTP responses, on the server]: http.html#http_class_http_serverresponse +[Readable.from]: #readable.from [TCP sockets]: net.html#net_class_net_socket [child process stdin]: child_process.html#child_process_subprocess_stdin [child process stdout and stderr]: child_process.html#child_process_subprocess_stdout diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 03f7924e5c5a3c..bd0185c0446d7f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1154,3 +1154,42 @@ function endReadableNT(state, stream) { } } } + +Readable.from = function(iterable, opts) { + let iterator; + if (iterable && iterable[Symbol.asyncIterator]) + iterator = iterable[Symbol.asyncIterator](); + else if (iterable && iterable[Symbol.iterator]) + iterator = iterable[Symbol.iterator](); + else + throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); + + const readable = new Readable({ + objectMode: true, + ...opts + }); + // Reading boolean to protect against _read + // being called before last iteration completion. + let reading = false; + readable._read = function() { + if (!reading) { + reading = true; + next(); + } + }; + async function next() { + try { + const { value, done } = await iterator.next(); + if (done) { + readable.push(null); + } else if (readable.push(await value)) { + next(); + } else { + reading = false; + } + } catch (err) { + readable.destroy(err); + } + } + return readable; +}; diff --git a/test/parallel/test-events-once.js b/test/parallel/test-events-once.js index f99604018ad0af..25ef4e9845422c 100644 --- a/test/parallel/test-events-once.js +++ b/test/parallel/test-events-once.js @@ -90,4 +90,4 @@ Promise.all([ catchesErrors(), stopListeningAfterCatchingError(), onceError() -]); +]).then(common.mustCall()); diff --git a/test/parallel/test-readable-from.js b/test/parallel/test-readable-from.js new file mode 100644 index 00000000000000..a441f743ccf756 --- /dev/null +++ b/test/parallel/test-readable-from.js @@ -0,0 +1,163 @@ +'use strict'; + +const { mustCall } = require('../common'); +const { once } = require('events'); +const { Readable } = require('stream'); +const { strictEqual } = require('assert'); + +async function toReadableBasicSupport() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const stream = Readable.from(generate()); + + const expected = ['a', 'b', 'c']; + + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function toReadableSyncIterator() { + function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const stream = Readable.from(generate()); + + const expected = ['a', 'b', 'c']; + + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function toReadablePromises() { + const promises = [ + Promise.resolve('a'), + Promise.resolve('b'), + Promise.resolve('c') + ]; + + const stream = Readable.from(promises); + + const expected = ['a', 'b', 'c']; + + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function toReadableString() { + const stream = Readable.from('abc'); + + const expected = ['a', 'b', 'c']; + + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function toReadableOnData() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const stream = Readable.from(generate()); + + let iterations = 0; + const expected = ['a', 'b', 'c']; + + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk, expected.shift()); + }); + + await once(stream, 'end'); + + strictEqual(iterations, 3); +} + +async function toReadableOnDataNonObject() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const stream = Readable.from(generate(), { objectMode: false }); + + let iterations = 0; + const expected = ['a', 'b', 'c']; + + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk instanceof Buffer, true); + strictEqual(chunk.toString(), expected.shift()); + }); + + await once(stream, 'end'); + + strictEqual(iterations, 3); +} + +async function destroysTheStreamWhenThrowing() { + async function * generate() { + throw new Error('kaboom'); + } + + const stream = Readable.from(generate()); + + stream.read(); + + try { + await once(stream, 'error'); + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function asTransformStream() { + async function * generate(stream) { + for await (const chunk of stream) { + yield chunk.toUpperCase(); + } + } + + const source = new Readable({ + objectMode: true, + read() { + this.push('a'); + this.push('b'); + this.push('c'); + this.push(null); + } + }); + + const stream = Readable.from(generate(source)); + + const expected = ['A', 'B', 'C']; + + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +Promise.all([ + toReadableBasicSupport(), + toReadableSyncIterator(), + toReadablePromises(), + toReadableString(), + toReadableOnData(), + toReadableOnDataNonObject(), + destroysTheStreamWhenThrowing(), + asTransformStream() +]).then(mustCall());