From 711997e7e65738e781d3f85c7b044ba0b4b75f20 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Jan 2020 14:10:53 +0100 Subject: [PATCH 1/5] stream: add async iterator support for v1 streams --- lib/internal/streams/async_iterator.js | 18 ++++++++ .../test-stream-readable-async-iterators.js | 44 ++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 8755e22de0d00f..93b676ff02892b 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -20,6 +20,8 @@ const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); +let Readable; + function createIterResult(value, done) { return { value, done }; } @@ -145,6 +147,22 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ }, AsyncIteratorPrototype); const createReadableStreamAsyncIterator = (stream) => { + if (typeof stream.read !== 'function') { + // v1 stream + + if (!Readable) { + Readable = require('_stream_readable'); + } + + const src = stream; + stream = new Readable({ objectMode: true }).wrap(src); + finished(stream, (err) => { + if (typeof src.destroy === 'function') { + src.destroy(err); + } + }); + } + const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { [kStream]: { value: stream, writable: true }, [kLastResolve]: { value: null, writable: true }, diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index ded77e01da3d9b..26482fa717ef74 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -1,7 +1,13 @@ 'use strict'; const common = require('../common'); -const { Readable, Transform, PassThrough, pipeline } = require('stream'); +const { + Stream, + Readable, + Transform, + PassThrough, + pipeline +} = require('stream'); const assert = require('assert'); async function tests() { @@ -14,6 +20,42 @@ async function tests() { AsyncIteratorPrototype); } + { + // v1 stream + + const stream = new Stream(); + stream.destroy = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 'hello'); + stream.emit('data', 'world'); + stream.emit('end'); + }); + + let res = ''; + stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; + for await (const d of stream) { + res += d; + } + assert.strictEqual(res, 'helloworld'); + } + + { + // v1 stream error + + const stream = new Stream(); + stream.close = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 0); + stream.emit('data', 1); + stream.emit('error', new Error('asd')); + }); + + const iter = Readable.prototype[Symbol.asyncIterator].call(stream); + iter.next().catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + } + { const readable = new Readable({ objectMode: true, read() {} }); readable.push(0); From 7f8d7dadd9cbc7e59f99256376c4fb65dbc4e5d7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Jan 2020 17:26:49 +0100 Subject: [PATCH 2/5] stream: normalize async iterator stream destroy --- lib/internal/streams/async_iterator.js | 17 +++++++++++------ .../test-stream-readable-async-iterators.js | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 93b676ff02892b..5eefba517b97d2 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -22,6 +22,15 @@ const kStream = Symbol('stream'); let Readable; +function destroy(stream, err) { + // request.destroy just do .end - .abort is what we want + if (typeof stream.abort === 'function') return stream.abort(); + if (stream.req && + typeof stream.req.abort === 'function') return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} + function createIterResult(value, done) { return { value, done }; } @@ -141,7 +150,7 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ resolve(createIterResult(undefined, true)); } }); - stream.destroy(); + destroy(stream); }); }, }, AsyncIteratorPrototype); @@ -156,11 +165,7 @@ const createReadableStreamAsyncIterator = (stream) => { const src = stream; stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => { - if (typeof src.destroy === 'function') { - src.destroy(err); - } - }); + finished(stream, (err) => destroy(src, err)); } const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 26482fa717ef74..b783b5f2ee8440 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -56,6 +56,25 @@ async function tests() { })); } + { + // Non standard stream cleanup + + const readable = new Readable({ autoDestroy: false, read() {} }); + readable.push('asd'); + readable.push('asd'); + readable.destroy = null; + readable.close = common.mustCall(() => { + readable.emit('close'); + }); + + await (async () => { + for await (const d of readable) { + d; + return; + } + })(); + } + { const readable = new Readable({ objectMode: true, read() {} }); readable.push(0); From 0620728a9a53b0daf1b068a0cdc41ecfe200d8c4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Jan 2020 18:01:43 +0100 Subject: [PATCH 3/5] stream: implement throw for async iterator --- lib/internal/streams/async_iterator.js | 49 +++++++++++-------- .../test-stream-readable-async-iterators.js | 22 +++++++++ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 5eefba517b97d2..6d798ec2ffe21f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -71,6 +71,31 @@ function wrapForNext(lastPromise, iter) { const AsyncIteratorPrototype = ObjectGetPrototypeOf( ObjectGetPrototypeOf(async function* () {}).prototype); +function finish(self, err) { + return new Promise((resolve, reject) => { + const stream = self[kStream]; + + // TODO(ronag): Remove this check once finished() handles + // already ended and/or destroyed streams. + const ended = stream.destroyed || stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + + if (ended) { + resolve(createIterResult(undefined, true)); + return; + } + + finished(stream, (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + destroy(stream, err); + }); +} + const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ get stream() { return this[kStream]; @@ -131,27 +156,11 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ }, return() { - return new Promise((resolve, reject) => { - const stream = this[kStream]; - - // TODO(ronag): Remove this check once finished() handles - // already ended and/or destroyed streams. - const ended = stream.destroyed || stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - if (ended) { - resolve(createIterResult(undefined, true)); - return; - } + return finish(this); + }, - finished(stream, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - destroy(stream); - }); + throw(err) { + return finish(this, err); }, }, AsyncIteratorPrototype); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index b783b5f2ee8440..1da31b45e23ba9 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -282,6 +282,28 @@ async function tests() { assert.strictEqual(received, 1); } + { + // Iterator throw. + + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + } + }); + + readable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + const it = readable[Symbol.asyncIterator](); + it.throw(new Error('kaboom')).catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + assert.strictEqual(readable.destroyed, true); + } + { console.log('destroyed by throw'); const readable = new Readable({ From 0ea887c07c828aed2898d1d60f551cdbe3e69226 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 18 Jan 2020 09:59:25 +0100 Subject: [PATCH 4/5] stream: simplify pipeline --- lib/internal/streams/pipeline.js | 40 +++++++------------------------- 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e0834171bfb8fc..9d383467424a0f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -28,14 +28,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -function destroyStream(stream, err) { - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); -} - function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -57,7 +49,11 @@ function destroyer(stream, reading, writing, callback) { if (destroyed) return; destroyed = true; - destroyStream(stream, err); + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; @@ -101,39 +97,19 @@ function makeAsyncIterable(val) { return val; } else if (isReadable(val)) { // Legacy streams are not Iterable. - return _fromReadable(val); + return fromReadable(val); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); } } -async function* _fromReadable(val) { +async function* fromReadable(val) { if (!createReadableStreamAsyncIterator) { createReadableStreamAsyncIterator = require('internal/streams/async_iterator'); } - - try { - if (typeof val.read !== 'function') { - // createReadableStreamAsyncIterator does not support - // v1 streams. Convert it into a v2 stream. - - if (!PassThrough) { - PassThrough = require('_stream_passthrough'); - } - - const pt = new PassThrough(); - val - .on('error', (err) => pt.destroy(err)) - .pipe(pt); - yield* createReadableStreamAsyncIterator(pt); - } else { - yield* createReadableStreamAsyncIterator(val); - } - } finally { - destroyStream(val); - } + yield* createReadableStreamAsyncIterator(val); } async function pump(iterable, writable, finish) { From 3201faa99c915524487b5b20df38c884be6ea021 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Jan 2020 20:47:33 +0100 Subject: [PATCH 5/5] stream: re-use legacy destroyer --- lib/internal/streams/async_iterator.js | 14 +++----------- lib/internal/streams/destroy.js | 13 +++++++++++++ lib/internal/streams/pipeline.js | 11 ++--------- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 6d798ec2ffe21f..c4c220a5afe34f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -11,6 +11,7 @@ const { } = primordials; const finished = require('internal/streams/end-of-stream'); +const destroyImpl = require('internal/streams/destroy'); const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); @@ -22,15 +23,6 @@ const kStream = Symbol('stream'); let Readable; -function destroy(stream, err) { - // request.destroy just do .end - .abort is what we want - if (typeof stream.abort === 'function') return stream.abort(); - if (stream.req && - typeof stream.req.abort === 'function') return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); -} - function createIterResult(value, done) { return { value, done }; } @@ -92,7 +84,7 @@ function finish(self, err) { resolve(createIterResult(undefined, true)); } }); - destroy(stream, err); + destroyImpl.destroyer(stream, err); }); } @@ -174,7 +166,7 @@ const createReadableStreamAsyncIterator = (stream) => { const src = stream; stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => destroy(src, err)); + finished(stream, (err) => destroyImpl.destroyer(src, err)); } const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index b80fb56a6bf324..8837af2d717781 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -151,8 +151,21 @@ function errorOrDestroy(stream, err, sync) { } } +function isRequest(stream) { + return stream && stream.setHeader && typeof stream.abort === 'function'; +} + +// Normalize destroy for legacy. +function destroyer(stream, err) { + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} module.exports = { + destroyer, destroy, undestroy, errorOrDestroy diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 9d383467424a0f..1ead5cdf9f3421 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -12,6 +12,7 @@ const { let eos; const { once } = require('internal/util'); +const destroyImpl = require('internal/streams/destroy'); const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, @@ -24,10 +25,6 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function isRequest(stream) { - return stream && stream.setHeader && typeof stream.abort === 'function'; -} - function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -49,11 +46,7 @@ function destroyer(stream, reading, writing, callback) { if (destroyed) return; destroyed = true; - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); + destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); };