diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 8755e22de0d00f..c4c220a5afe34f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -11,6 +11,7 @@ const { } = primordials; const finished = require('internal/streams/end-of-stream'); +const destroyImpl = require('internal/streams/destroy'); const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); @@ -20,6 +21,8 @@ const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); +let Readable; + function createIterResult(value, done) { return { value, done }; } @@ -60,6 +63,31 @@ function wrapForNext(lastPromise, iter) { const AsyncIteratorPrototype = ObjectGetPrototypeOf( ObjectGetPrototypeOf(async function* () {}).prototype); +function finish(self, err) { + return new Promise((resolve, reject) => { + const stream = self[kStream]; + + // TODO(ronag): Remove this check once finished() handles + // already ended and/or destroyed streams. + const ended = stream.destroyed || stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + + if (ended) { + resolve(createIterResult(undefined, true)); + return; + } + + finished(stream, (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + destroyImpl.destroyer(stream, err); + }); +} + const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ get stream() { return this[kStream]; @@ -120,31 +148,27 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ }, return() { - return new Promise((resolve, reject) => { - const stream = this[kStream]; - - // TODO(ronag): Remove this check once finished() handles - // already ended and/or destroyed streams. - const ended = stream.destroyed || stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - if (ended) { - resolve(createIterResult(undefined, true)); - return; - } + return finish(this); + }, - finished(stream, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - stream.destroy(); - }); + throw(err) { + return finish(this, err); }, }, AsyncIteratorPrototype); const createReadableStreamAsyncIterator = (stream) => { + if (typeof stream.read !== 'function') { + // v1 stream + + if (!Readable) { + Readable = require('_stream_readable'); + } + + const src = stream; + stream = new Readable({ objectMode: true }).wrap(src); + finished(stream, (err) => destroyImpl.destroyer(src, err)); + } + const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { [kStream]: { value: stream, writable: true }, [kLastResolve]: { value: null, writable: true }, diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index b80fb56a6bf324..8837af2d717781 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -151,8 +151,21 @@ function errorOrDestroy(stream, err, sync) { } } +function isRequest(stream) { + return stream && stream.setHeader && typeof stream.abort === 'function'; +} + +// Normalize destroy for legacy. +function destroyer(stream, err) { + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} module.exports = { + destroyer, destroy, undestroy, errorOrDestroy diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e0834171bfb8fc..1ead5cdf9f3421 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -12,6 +12,7 @@ const { let eos; const { once } = require('internal/util'); +const destroyImpl = require('internal/streams/destroy'); const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, @@ -24,18 +25,6 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function isRequest(stream) { - return stream && stream.setHeader && typeof stream.abort === 'function'; -} - -function destroyStream(stream, err) { - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); -} - function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -57,7 +46,7 @@ function destroyer(stream, reading, writing, callback) { if (destroyed) return; destroyed = true; - destroyStream(stream, err); + destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; @@ -101,39 +90,19 @@ function makeAsyncIterable(val) { return val; } else if (isReadable(val)) { // Legacy streams are not Iterable. - return _fromReadable(val); + return fromReadable(val); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); } } -async function* _fromReadable(val) { +async function* fromReadable(val) { if (!createReadableStreamAsyncIterator) { createReadableStreamAsyncIterator = require('internal/streams/async_iterator'); } - - try { - if (typeof val.read !== 'function') { - // createReadableStreamAsyncIterator does not support - // v1 streams. Convert it into a v2 stream. - - if (!PassThrough) { - PassThrough = require('_stream_passthrough'); - } - - const pt = new PassThrough(); - val - .on('error', (err) => pt.destroy(err)) - .pipe(pt); - yield* createReadableStreamAsyncIterator(pt); - } else { - yield* createReadableStreamAsyncIterator(val); - } - } finally { - destroyStream(val); - } + yield* createReadableStreamAsyncIterator(val); } async function pump(iterable, writable, finish) { diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index ded77e01da3d9b..1da31b45e23ba9 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -1,7 +1,13 @@ 'use strict'; const common = require('../common'); -const { Readable, Transform, PassThrough, pipeline } = require('stream'); +const { + Stream, + Readable, + Transform, + PassThrough, + pipeline +} = require('stream'); const assert = require('assert'); async function tests() { @@ -14,6 +20,61 @@ async function tests() { AsyncIteratorPrototype); } + { + // v1 stream + + const stream = new Stream(); + stream.destroy = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 'hello'); + stream.emit('data', 'world'); + stream.emit('end'); + }); + + let res = ''; + stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; + for await (const d of stream) { + res += d; + } + assert.strictEqual(res, 'helloworld'); + } + + { + // v1 stream error + + const stream = new Stream(); + stream.close = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 0); + stream.emit('data', 1); + stream.emit('error', new Error('asd')); + }); + + const iter = Readable.prototype[Symbol.asyncIterator].call(stream); + iter.next().catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + } + + { + // Non standard stream cleanup + + const readable = new Readable({ autoDestroy: false, read() {} }); + readable.push('asd'); + readable.push('asd'); + readable.destroy = null; + readable.close = common.mustCall(() => { + readable.emit('close'); + }); + + await (async () => { + for await (const d of readable) { + d; + return; + } + })(); + } + { const readable = new Readable({ objectMode: true, read() {} }); readable.push(0); @@ -221,6 +282,28 @@ async function tests() { assert.strictEqual(received, 1); } + { + // Iterator throw. + + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + } + }); + + readable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + const it = readable[Symbol.asyncIterator](); + it.throw(new Error('kaboom')).catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + assert.strictEqual(readable.destroyed, true); + } + { console.log('destroyed by throw'); const readable = new Readable({