diff --git a/benchmark/streams/readable-async-iterator.js b/benchmark/streams/readable-async-iterator.js deleted file mode 100644 index dbe335ab98b651..00000000000000 --- a/benchmark/streams/readable-async-iterator.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict'; - -const common = require('../common'); -const Readable = require('stream').Readable; - -const bench = common.createBenchmark(main, { - n: [1e5], - sync: ['yes', 'no'], -}); - -async function main({ n, sync }) { - sync = sync === 'yes'; - - const s = new Readable({ - objectMode: true, - read() { - if (sync) { - this.push(1); - } else { - process.nextTick(() => { - this.push(1); - }); - } - } - }); - - bench.start(); - - let x = 0; - for await (const chunk of s) { - x += chunk; - if (x > n) { - break; - } - } - - bench.end(n); -} diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index a8e104588f1028..af8786198674cb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -27,7 +27,6 @@ const { NumberIsNaN, ObjectDefineProperties, ObjectSetPrototypeOf, - Promise, Set, SymbolAsyncIterator, Symbol @@ -60,11 +59,11 @@ const kPaused = Symbol('kPaused'); // Lazy loaded to improve the startup performance. let StringDecoder; +let createReadableStreamAsyncIterator; let from; ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); -function nop() {} const { errorOrDestroy } = destroyImpl; @@ -1076,68 +1075,13 @@ Readable.prototype.wrap = function(stream) { }; Readable.prototype[SymbolAsyncIterator] = function() { - let stream = this; - - if (typeof stream.read !== 'function') { - // v1 stream - const src = stream; - stream = new Readable({ - objectMode: true, - destroy(err, callback) { - destroyImpl.destroyer(src, err); - callback(); - } - }).wrap(src); + if (createReadableStreamAsyncIterator === undefined) { + createReadableStreamAsyncIterator = + require('internal/streams/async_iterator'); } - - const iter = createAsyncIterator(stream); - iter.stream = stream; - return iter; + return createReadableStreamAsyncIterator(this); }; -async function* createAsyncIterator(stream) { - let callback = nop; - - function next(resolve) { - if (this === stream) { - callback(); - callback = nop; - } else { - callback = resolve; - } - } - - stream - .on('readable', next) - .on('error', next) - .on('end', next) - .on('close', next); - - try { - const state = stream._readableState; - while (true) { - const chunk = stream.read(); - if (chunk !== null) { - yield chunk; - } else if (state.errored) { - throw state.errored; - } else if (state.ended) { - break; - } else if (state.closed) { - // TODO(ronag): ERR_PREMATURE_CLOSE? - break; - } else { - await new Promise(next); - } - } - } catch (err) { - destroyImpl.destroyer(stream, err); - throw err; - } finally { - destroyImpl.destroyer(stream, null); - } -} - // Making it explicit these properties are not enumerable // because otherwise some prototype manipulation in // userland will fail. diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js new file mode 100644 index 00000000000000..87b7f227d59070 --- /dev/null +++ b/lib/internal/streams/async_iterator.js @@ -0,0 +1,221 @@ +'use strict'; + +const { + ObjectCreate, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + Promise, + PromiseReject, + PromiseResolve, + Symbol, +} = primordials; + +const finished = require('internal/streams/end-of-stream'); +const destroyImpl = require('internal/streams/destroy'); + +const kLastResolve = Symbol('lastResolve'); +const kLastReject = Symbol('lastReject'); +const kError = Symbol('error'); +const kEnded = Symbol('ended'); +const kLastPromise = Symbol('lastPromise'); +const kHandlePromise = Symbol('handlePromise'); +const kStream = Symbol('stream'); + +let Readable; + +function createIterResult(value, done) { + return { value, done }; +} + +function readAndResolve(iter) { + const resolve = iter[kLastResolve]; + if (resolve !== null) { + const data = iter[kStream].read(); + // We defer if data is null. We can be expecting either 'end' or 'error'. + if (data !== null) { + iter[kLastPromise] = null; + iter[kLastResolve] = null; + iter[kLastReject] = null; + resolve(createIterResult(data, false)); + } + } +} + +function onReadable(iter) { + // We wait for the next tick, because it might + // emit an error with `process.nextTick()`. + process.nextTick(readAndResolve, iter); +} + +function wrapForNext(lastPromise, iter) { + return (resolve, reject) => { + lastPromise.then(() => { + if (iter[kEnded]) { + resolve(createIterResult(undefined, true)); + return; + } + + iter[kHandlePromise](resolve, reject); + }, reject); + }; +} + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +function finish(self, err) { + return new Promise((resolve, reject) => { + const stream = self[kStream]; + + finished(stream, (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + destroyImpl.destroyer(stream, err); + }); +} + +const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ + get stream() { + return this[kStream]; + }, + + next() { + // If we have detected an error in the meanwhile + // reject straight away. + const error = this[kError]; + if (error !== null) { + return PromiseReject(error); + } + + if (this[kEnded]) { + return PromiseResolve(createIterResult(undefined, true)); + } + + if (this[kStream].destroyed) { + return new Promise((resolve, reject) => { + if (this[kError]) { + reject(this[kError]); + } else if (this[kEnded]) { + resolve(createIterResult(undefined, true)); + } else { + finished(this[kStream], (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + } + }); + } + + // If we have multiple next() calls we will wait for the previous Promise to + // finish. This logic is optimized to support for await loops, where next() + // is only called once at a time. + const lastPromise = this[kLastPromise]; + let promise; + + if (lastPromise) { + promise = new Promise(wrapForNext(lastPromise, this)); + } else { + // Fast path needed to support multiple this.push() + // without triggering the next() queue. + const data = this[kStream].read(); + if (data !== null) { + return PromiseResolve(createIterResult(data, false)); + } + + promise = new Promise(this[kHandlePromise]); + } + + this[kLastPromise] = promise; + + return promise; + }, + + return() { + return finish(this); + }, + + throw(err) { + return finish(this, err); + }, +}, AsyncIteratorPrototype); + +const createReadableStreamAsyncIterator = (stream) => { + if (typeof stream.read !== 'function') { + // v1 stream + + if (!Readable) { + Readable = require('_stream_readable'); + } + + const src = stream; + stream = new Readable({ objectMode: true }).wrap(src); + finished(stream, (err) => destroyImpl.destroyer(src, err)); + } + + const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { + [kStream]: { value: stream, writable: true }, + [kLastResolve]: { value: null, writable: true }, + [kLastReject]: { value: null, writable: true }, + [kError]: { value: null, writable: true }, + [kEnded]: { + value: stream.readableEnded || stream._readableState.endEmitted, + writable: true + }, + // The function passed to new Promise is cached so we avoid allocating a new + // closure at every run. + [kHandlePromise]: { + value: (resolve, reject) => { + const data = iterator[kStream].read(); + if (data) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(data, false)); + } else { + iterator[kLastResolve] = resolve; + iterator[kLastReject] = reject; + } + }, + writable: true, + }, + }); + iterator[kLastPromise] = null; + + finished(stream, { writable: false }, (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + const reject = iterator[kLastReject]; + // Reject if we are waiting for data in the Promise returned by next() and + // store the error. + if (reject !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + reject(err); + } + iterator[kError] = err; + return; + } + + const resolve = iterator[kLastResolve]; + if (resolve !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(undefined, true)); + } + iterator[kEnded] = true; + }); + + stream.on('readable', onReadable.bind(null, iterator)); + + return iterator; +}; + +module.exports = createReadableStreamAsyncIterator; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index aaee0d24992af8..4786b906f4324c 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,7 +23,7 @@ const { let EE; let PassThrough; -let Readable; +let createReadableStreamAsyncIterator; function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -113,11 +113,11 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - if (!Readable) { - Readable = require('_stream_readable'); + if (!createReadableStreamAsyncIterator) { + createReadableStreamAsyncIterator = + require('internal/streams/async_iterator'); } - - yield* Readable.prototype[SymbolAsyncIterator].call(val); + yield* createReadableStreamAsyncIterator(val); } async function pump(iterable, writable, finish) { diff --git a/node.gyp b/node.gyp index dd832ffc2adc12..88942393ff3671 100644 --- a/node.gyp +++ b/node.gyp @@ -222,6 +222,7 @@ 'lib/internal/worker/js_transferable.js', 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', + 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', diff --git a/test/parallel/test-readline-async-iterators-destroy.js b/test/parallel/test-readline-async-iterators-destroy.js index 7c14a667062fec..e96f831432b1eb 100644 --- a/test/parallel/test-readline-async-iterators-destroy.js +++ b/test/parallel/test-readline-async-iterators-destroy.js @@ -75,7 +75,6 @@ async function testMutualDestroy() { break; } assert.deepStrictEqual(iteratedLines, expectedLines); - break; } assert.deepStrictEqual(iteratedLines, expectedLines); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 604ba3afb47fe7..55d16a1c5d363e 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -11,6 +11,17 @@ const { const assert = require('assert'); async function tests() { + { + const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype); + const rs = new Readable({ + read() {} + }); + assert.strictEqual( + Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), + AsyncIteratorPrototype); + } + { // v1 stream @@ -42,9 +53,7 @@ async function tests() { }); const iter = Readable.prototype[Symbol.asyncIterator].call(stream); - await iter.next(); - await iter.next(); - await iter.next().catch(common.mustCall((err) => { + iter.next().catch(common.mustCall((err) => { assert.strictEqual(err.message, 'asd'); })); } @@ -180,19 +189,12 @@ async function tests() { resolved.forEach(common.mustCall( (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); - errors.slice(0, 1).forEach((promise) => { + errors.forEach((promise) => { promise.catch(common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); }); - errors.slice(1).forEach((promise) => { - promise.then(common.mustCall(({ done, value }) => { - assert.strictEqual(done, true); - assert.strictEqual(value, undefined); - })); - }); - readable.destroy(new Error('kaboom')); } @@ -282,6 +284,28 @@ async function tests() { assert.strictEqual(received, 1); } + { + // Iterator throw. + + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + } + }); + + readable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + const it = readable[Symbol.asyncIterator](); + it.throw(new Error('kaboom')).catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + assert.strictEqual(readable.destroyed, true); + } + { console.log('destroyed by throw'); const readable = new Readable({ @@ -553,15 +577,12 @@ async function tests() { assert.strictEqual(e, err); })(), (async () => { let e; - let x; try { - x = await d; + await d; } catch (_e) { e = _e; } - assert.strictEqual(e, undefined); - assert.strictEqual(x.done, true); - assert.strictEqual(x.value, undefined); + assert.strictEqual(e, err); })()]); }