From 0334a5977176373d5b5d63f3148cb12d7693c0b1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 7 Jul 2021 11:33:55 +0200 Subject: [PATCH 1/4] stream: unify stream utils Unify stream helps into utils. --- lib/internal/streams/destroy.js | 22 +--- lib/internal/streams/end-of-stream.js | 88 ++++--------- lib/internal/streams/pipeline.js | 8 +- lib/internal/streams/utils.js | 178 +++++++++++++++++++++++++- test/parallel/test-stream-finished.js | 3 +- test/parallel/test-stream-pipeline.js | 2 +- 6 files changed, 211 insertions(+), 90 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index df2d9f7f71987a..4dbcf7ba488837 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -10,6 +10,11 @@ const { const { Symbol, } = primordials; +const { + kDestroyed, + isDestroyed, + isFinished, +} = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -364,8 +369,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -const kDestroyed = Symbol('kDestroyed'); - function emitCloseLegacy(stream) { stream.emit('close'); } @@ -375,25 +378,13 @@ function emitErrorCloseLegacy(stream, err) { process.nextTick(emitCloseLegacy, stream); } -function isDestroyed(stream) { - return stream.destroyed || stream[kDestroyed]; -} - -function isReadable(stream) { - return stream.readable && !stream.readableEnded && !isDestroyed(stream); -} - -function isWritable(stream) { - return stream.writable && !stream.writableEnded && !isDestroyed(stream); -} - // Normalize destroy for legacy. function destroyer(stream, err) { if (isDestroyed(stream)) { return; } - if (!err && (isReadable(stream) || isWritable(stream))) { + if (!err && !isFinished(stream)) { err = new AbortError(); } @@ -422,7 +413,6 @@ function destroyer(stream, err) { module.exports = { kDestroy, - isDestroyed, construct, destroyer, destroy, diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index efc2441c51ee39..0b691c53176956 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -17,48 +17,23 @@ const { validateObject, } = require('internal/validators'); +const { + isClosed, + isReadable, + isReadableStream, + isReadableFinished, + isWritable, + isWritableStream, + isWritableFinished, + willEmitClose: _willEmitClose, +} = require('internal/streams/utils'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } -function isServerResponse(stream) { - return ( - typeof stream._sent100 === 'boolean' && - typeof stream._removedConnection === 'boolean' && - typeof stream._removedContLen === 'boolean' && - typeof stream._removedTE === 'boolean' && - typeof stream._closed === 'boolean' - ); -} - -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - -function isWritableFinished(stream) { - if (stream.writableFinished) return true; - const wState = stream._writableState; - if (!wState || wState.errored) return false; - return wState.finished || (wState.ended && wState.length === 0); -} - const nop = () => {}; -function isReadableEnded(stream) { - if (stream.readableEnded) return true; - const rState = stream._readableState; - if (!rState || rState.errored) return false; - return rState.endEmitted || (rState.ended && rState.length === 0); -} - function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; @@ -74,13 +49,12 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadable(stream)); + (options.readable !== false && isReadableStream(stream)); const writable = options.writable || - (options.writable !== false && isWritable(stream)); + (options.writable !== false && isWritableStream(stream)); const wState = stream._writableState; const rState = stream._readableState; - const state = wState || rState; const onlegacyfinish = () => { if (!stream.writable) onfinish(); @@ -89,16 +63,13 @@ function eos(stream, options, callback) { // TODO (ronag): Improve soft detection to include core modules and // common ecosystem modules that do properly emit 'close' but fail // this generic check. - let willEmitClose = isServerResponse(stream) || ( - state && - state.autoDestroy && - state.emitClose && - state.closed === false && - isReadable(stream) === readable && - isWritable(stream) === writable + let willEmitClose = ( + _willEmitClose(stream) && + isReadableStream(stream) === readable && + isWritableStream(stream) === writable ); - let writableFinished = stream.writableFinished || wState?.finished; + let writableFinished = isWritableFinished(stream, false); const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -107,12 +78,12 @@ function eos(stream, options, callback) { if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.readable || readable)) return; - if (!readable || readableEnded) callback.call(stream); + if (!readable || readableFinished) callback.call(stream); }; - let readableEnded = stream.readableEnded || rState?.endEmitted; + let readableFinished = isReadableFinished(stream, false); const onend = () => { - readableEnded = true; + readableFinished = true; // Stream should not be destroyed here. If it is that // means that user space is doing something differently and // we cannot trust willEmitClose. @@ -126,7 +97,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = wState?.closed || rState?.closed; + let closed = isClosed(stream); const onclose = () => { closed = true; @@ -137,13 +108,13 @@ function eos(stream, options, callback) { return callback.call(stream, errored); } - if (readable && !readableEnded) { - if (!isReadableEnded(stream)) + if (readable && !readableFinished) { + if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } if (writable && !writableFinished) { - if (!isWritableFinished(stream)) + if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } @@ -185,19 +156,16 @@ function eos(stream, options, callback) { } } else if ( !readable && - (!willEmitClose || stream.readable) && + (!willEmitClose || isReadable(stream)) && writableFinished ) { process.nextTick(onclose); } else if ( !writable && - (!willEmitClose || stream.writable) && - readableEnded + (!willEmitClose || isWritable(stream)) && + readableFinished ) { process.nextTick(onclose); - } else if (!wState && !rState && stream._closed === true) { - // _closed is for OutgoingMessage which is not a proper Writable. - process.nextTick(onclose); } else if ((rState && stream.req && stream.aborted)) { process.nextTick(onclose); } diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 5759dbd4a580a3..ad08054408fa0e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,7 +27,7 @@ const { validateCallback } = require('internal/validators'); const { isIterable, - isReadable, + isReadableStream, isStream, } = require('internal/streams/utils'); @@ -87,7 +87,7 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadable(val)) { + } else if (isReadableStream(val)) { // Legacy streams are not Iterable. return fromReadable(val); } @@ -216,7 +216,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadable(stream)) { + } else if (isIterable(stream) || isReadableStream(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( @@ -272,7 +272,7 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret)) { + if (isReadableStream(ret)) { ret.pipe(stream); // Compat. Before node v10.12.0 stdio used to throw an error so diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 62eea022685e18..796634af67a953 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -1,22 +1,34 @@ 'use strict'; const { + Symbol, SymbolAsyncIterator, SymbolIterator, } = primordials; -function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function' && - typeof obj.on === 'function'); +const kDestroyed = Symbol('kDestroyed'); + +function isReadableStream(obj) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + (!obj._writableState || obj._readableState?.readable !== false) && // Duplex + (!obj._writableState || obj._readableState) // Writable has .pipe. + ); } -function isWritable(obj) { - return !!(obj && typeof obj.write === 'function' && - typeof obj.on === 'function'); +function isWritableStream(obj) { + return !!( + obj && + typeof obj.write === 'function' && + typeof obj.on === 'function' && + (!obj._readableState || obj._writableState?.writable !== false) // Duplex + ); } function isStream(obj) { - return isReadable(obj) || isWritable(obj); + return isReadableStream(obj) || isWritableStream(obj); } function isIterable(obj, isAsync) { @@ -27,8 +39,160 @@ function isIterable(obj, isAsync) { typeof obj[SymbolIterator] === 'function'; } +function isDestroyed(stream) { + if (!isStream(stream)) return null; + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed); +} + +// Have been end():d. +function isWritableEnded(stream) { + if (!isWritableStream(stream)) return null; + if (stream.writableEnded === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.ended !== 'boolean') return null; + return wState.ended; +} + +// Have emitted 'finish'. +function isWritableFinished(stream, strict) { + if (!isWritableStream(stream)) return null; + if (stream.writableFinished === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.finished !== 'boolean') return null; + return !!( + wState.finished || + (strict === false && wState.ended === true && wState.length === 0) + ); +} + +// Have been push(null):d. +function isReadableEnded(stream) { + if (!isReadableStream(stream)) return null; + if (stream.readableEnded === true) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + if (typeof rState?.ended !== 'boolean') return null; + return rState.ended; +} + +// Have emitted 'end'. +function isReadableFinished(stream, strict) { + if (!isReadableStream(stream)) return null; + const rState = stream._readableState; + if (rState?.errored) return false; + if (typeof rState?.endEmitted !== 'boolean') return null; + return !!( + rState.endEmitted || + (strict === false && rState.ended === true && rState.length === 0) + ); +} + +function isReadable(stream) { + const r = isReadableStream(stream); + if (r === null || typeof stream.readable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.readable && !isReadableFinished(stream); +} + +function isWritable(stream) { + const r = isWritableStream(stream); + if (r === null || typeof stream.writable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.writable && !isWritableEnded(stream); +} + +function isFinished(stream, opts) { + if (!isStream(stream)) { + return null; + } + + if (isDestroyed(stream)) { + return true; + } + + if (opts?.readable !== false && isReadable(stream)) { + return false; + } + + if (opts?.writable !== false && isWritable(stream)) { + return false; + } + + return true; +} + +function isClosed(stream) { + if (!isStream(stream)) { + return null; + } + + const wState = stream._writableState; + const rState = stream._readableState; + + if ( + typeof wState?.closed === 'boolean' || + typeof rState?.closed === 'boolean' + ) { + return wState?.closed || rState?.closed; + } + + if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) { + return stream._closed; + } + + return null; +} + +function isOutgoingMessage(stream) { + return ( + typeof stream._closed === 'boolean' && + typeof stream._defaultKeepAlive === 'boolean' && + typeof stream._removedConnection === 'boolean' && + typeof stream._removedContLen === 'boolean' + ); +} + +function isServerResponse(stream) { + return ( + typeof stream._sent100 === 'boolean' && + isOutgoingMessage(stream) + ); +} + +function willEmitClose(stream) { + if (!isStream(stream)) return null; + + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + + return (!state && isServerResponse(stream)) || !!( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); +} + module.exports = { + kDestroyed, + isClosed, + isDestroyed, + isFinished, isIterable, isReadable, + isReadableStream, + isReadableEnded, + isReadableFinished, isStream, + isWritable, + isWritableStream, + isWritableEnded, + isWritableFinished, + willEmitClose, }; diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 8e371911698336..8ada0c4c348cb7 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); d._writableState = {}; d._writableState.finished = true; finished(d, { readable: false, writable: true }, common.mustCall((err) => { - assert.strictEqual(err, undefined); + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); })); d._writableState.errored = true; d.emit('close'); @@ -586,7 +586,6 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); }); } - { const w = new Writable({ write(chunk, encoding, callback) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..1f4474e6b5fce6 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1035,7 +1035,7 @@ const net = require('net'); const dst = new PassThrough(); dst.readable = false; pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, false); + assert.strictEqual(dst.destroyed, true); })); src.end(); } From ad257e56cb36c140b7b536eec44e3461140a3234 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 7 Jul 2021 12:47:08 +0200 Subject: [PATCH 2/4] fixup: isServerRequest --- lib/_http_client.js | 2 -- lib/_http_incoming.js | 6 ------ lib/internal/streams/destroy.js | 7 ++++--- lib/internal/streams/utils.js | 10 ++++++++++ 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 598b585bcfa383..280c6ebab76073 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -53,7 +53,6 @@ const { prepareError, } = require('_http_common'); const { OutgoingMessage } = require('_http_outgoing'); -const { kDestroy } = require('internal/streams/destroy'); const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); @@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) { DTRACE_HTTP_CLIENT_RESPONSE(socket, req); req.res = res; res.req = req; - res[kDestroy] = null; // Add our listener first, so that we guarantee socket cleanup res.on('end', responseOnEnd); diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index a92687ce37bfbc..31b7db6f6c7c99 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -31,7 +31,6 @@ const { } = primordials; const { Readable, finished } = require('stream'); -const { kDestroy } = require('internal/streams/destroy'); const kHeaders = Symbol('kHeaders'); const kHeadersCount = Symbol('kHeadersCount'); @@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { } }; -IncomingMessage.prototype[kDestroy] = function(err) { - this.socket = null; - this.destroy(err); -}; - IncomingMessage.prototype._addHeaderLines = _addHeaderLines; function _addHeaderLines(headers, n) { if (headers && headers.length) { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 4dbcf7ba488837..dd81fdacd5d93d 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -14,6 +14,7 @@ const { kDestroyed, isDestroyed, isFinished, + isServerRequest } = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); @@ -389,8 +390,9 @@ function destroyer(stream, err) { } // TODO: Remove isRequest branches. - if (typeof stream[kDestroy] === 'function') { - stream[kDestroy](err); + if (isServerRequest(stream)) { + stream.socket = null; + stream.destroy(err); } else if (isRequest(stream)) { stream.abort(); } else if (isRequest(stream.req)) { @@ -412,7 +414,6 @@ function destroyer(stream, err) { } module.exports = { - kDestroy, construct, destroyer, destroy, diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 796634af67a953..fa824842d60b64 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -164,6 +164,14 @@ function isServerResponse(stream) { ); } +function isServerRequest(stream) { + return ( + typeof stream._consuming === 'boolean' && + typeof stream._dumped === 'boolean' && + stream.req?.upgradeOrConnect === undefined + ); +} + function willEmitClose(stream) { if (!isStream(stream)) return null; @@ -194,5 +202,7 @@ module.exports = { isWritableStream, isWritableEnded, isWritableFinished, + isServerRequest, + isServerResponse, willEmitClose, }; From 57cfc0d6993e6477788fe8eeaae093cef9d3a8b1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 9 Jul 2021 20:22:45 +0200 Subject: [PATCH 3/4] fixup: NodeStream --- lib/internal/streams/add-abort-signal.js | 4 +-- lib/internal/streams/end-of-stream.js | 12 ++++----- lib/internal/streams/pipeline.js | 14 +++++----- lib/internal/streams/utils.js | 34 ++++++++++++------------ lib/stream/promises.js | 4 +-- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index ba0da5e8bc4ac2..80814f0936782d 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => { } }; -function isStream(obj) { +function isNodeStream(obj) { return !!(obj && typeof obj.pipe === 'function'); } module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal'); - if (!isStream(stream)) { + if (!isNodeStream(stream)) { throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); } return module.exports.addAbortSignalNoValidate(signal, stream); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 0b691c53176956..1550259d02de00 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -20,10 +20,10 @@ const { const { isClosed, isReadable, - isReadableStream, + isReadableNodeStream, isReadableFinished, isWritable, - isWritableStream, + isWritableNodeStream, isWritableFinished, willEmitClose: _willEmitClose, } = require('internal/streams/utils'); @@ -49,9 +49,9 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadableStream(stream)); + (options.readable !== false && isReadableNodeStream(stream)); const writable = options.writable || - (options.writable !== false && isWritableStream(stream)); + (options.writable !== false && isWritableNodeStream(stream)); const wState = stream._writableState; const rState = stream._readableState; @@ -65,8 +65,8 @@ function eos(stream, options, callback) { // this generic check. let willEmitClose = ( _willEmitClose(stream) && - isReadableStream(stream) === readable && - isWritableStream(stream) === writable + isReadableNodeStream(stream) === readable && + isWritableNodeStream(stream) === writable ); let writableFinished = isWritableFinished(stream, false); diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ad08054408fa0e..c98b3b3d21b633 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators'); const { isIterable, - isReadableStream, - isStream, + isReadableNodeStream, + isNodeStream, } = require('internal/streams/utils'); let PassThrough; @@ -87,7 +87,7 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadableStream(val)) { + } else if (isReadableNodeStream(val)) { // Legacy streams are not Iterable. return fromReadable(val); } @@ -204,7 +204,7 @@ function pipeline(...streams) { const reading = i < streams.length - 1; const writing = i > 0; - if (isStream(stream)) { + if (isNodeStream(stream)) { finishCount++; destroys.push(destroyer(stream, reading, writing, finish)); } @@ -216,7 +216,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableStream(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( @@ -271,8 +271,8 @@ function pipeline(...streams) { finishCount++; destroys.push(destroyer(ret, false, true, finish)); } - } else if (isStream(stream)) { - if (isReadableStream(ret)) { + } else if (isNodeStream(stream)) { + if (isReadableNodeStream(ret)) { ret.pipe(stream); // Compat. Before node v10.12.0 stdio used to throw an error so diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index fa824842d60b64..0cd9c8eb36062f 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -8,7 +8,7 @@ const { const kDestroyed = Symbol('kDestroyed'); -function isReadableStream(obj) { +function isReadableNodeStream(obj) { return !!( obj && typeof obj.pipe === 'function' && @@ -18,7 +18,7 @@ function isReadableStream(obj) { ); } -function isWritableStream(obj) { +function isWritableNodeStream(obj) { return !!( obj && typeof obj.write === 'function' && @@ -27,8 +27,8 @@ function isWritableStream(obj) { ); } -function isStream(obj) { - return isReadableStream(obj) || isWritableStream(obj); +function isNodeStream(obj) { + return isReadableNodeStream(obj) || isWritableNodeStream(obj); } function isIterable(obj, isAsync) { @@ -40,7 +40,7 @@ function isIterable(obj, isAsync) { } function isDestroyed(stream) { - if (!isStream(stream)) return null; + if (!isNodeStream(stream)) return null; const wState = stream._writableState; const rState = stream._readableState; const state = wState || rState; @@ -49,7 +49,7 @@ function isDestroyed(stream) { // Have been end():d. function isWritableEnded(stream) { - if (!isWritableStream(stream)) return null; + if (!isWritableNodeStream(stream)) return null; if (stream.writableEnded === true) return true; const wState = stream._writableState; if (wState?.errored) return false; @@ -59,7 +59,7 @@ function isWritableEnded(stream) { // Have emitted 'finish'. function isWritableFinished(stream, strict) { - if (!isWritableStream(stream)) return null; + if (!isWritableNodeStream(stream)) return null; if (stream.writableFinished === true) return true; const wState = stream._writableState; if (wState?.errored) return false; @@ -72,7 +72,7 @@ function isWritableFinished(stream, strict) { // Have been push(null):d. function isReadableEnded(stream) { - if (!isReadableStream(stream)) return null; + if (!isReadableNodeStream(stream)) return null; if (stream.readableEnded === true) return true; const rState = stream._readableState; if (!rState || rState.errored) return false; @@ -82,7 +82,7 @@ function isReadableEnded(stream) { // Have emitted 'end'. function isReadableFinished(stream, strict) { - if (!isReadableStream(stream)) return null; + if (!isReadableNodeStream(stream)) return null; const rState = stream._readableState; if (rState?.errored) return false; if (typeof rState?.endEmitted !== 'boolean') return null; @@ -93,21 +93,21 @@ function isReadableFinished(stream, strict) { } function isReadable(stream) { - const r = isReadableStream(stream); + const r = isReadableNodeStream(stream); if (r === null || typeof stream.readable !== 'boolean') return null; if (isDestroyed(stream)) return false; return r && stream.readable && !isReadableFinished(stream); } function isWritable(stream) { - const r = isWritableStream(stream); + const r = isWritableNodeStream(stream); if (r === null || typeof stream.writable !== 'boolean') return null; if (isDestroyed(stream)) return false; return r && stream.writable && !isWritableEnded(stream); } function isFinished(stream, opts) { - if (!isStream(stream)) { + if (!isNodeStream(stream)) { return null; } @@ -127,7 +127,7 @@ function isFinished(stream, opts) { } function isClosed(stream) { - if (!isStream(stream)) { + if (!isNodeStream(stream)) { return null; } @@ -173,7 +173,7 @@ function isServerRequest(stream) { } function willEmitClose(stream) { - if (!isStream(stream)) return null; + if (!isNodeStream(stream)) return null; const wState = stream._writableState; const rState = stream._readableState; @@ -194,12 +194,12 @@ module.exports = { isFinished, isIterable, isReadable, - isReadableStream, + isReadableNodeStream, isReadableEnded, isReadableFinished, - isStream, + isNodeStream, isWritable, - isWritableStream, + isWritableNodeStream, isWritableEnded, isWritableFinished, isServerRequest, diff --git a/lib/stream/promises.js b/lib/stream/promises.js index f5d873197323e8..8a8e66417c6057 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -15,7 +15,7 @@ const { const { isIterable, - isStream, + isNodeStream, } = require('internal/streams/utils'); const pl = require('internal/streams/pipeline'); @@ -26,7 +26,7 @@ function pipeline(...streams) { let signal; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && - !isStream(lastArg) && !isIterable(lastArg)) { + !isNodeStream(lastArg) && !isIterable(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; validateAbortSignal(signal, 'options.signal'); From e08ea7f38fb03f26d897fb8fcf86e6f56f2482fe Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 10 Jul 2021 18:50:24 +0200 Subject: [PATCH 4/4] fixup --- lib/internal/streams/end-of-stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 1550259d02de00..274c2796edd443 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -157,13 +157,13 @@ function eos(stream, options, callback) { } else if ( !readable && (!willEmitClose || isReadable(stream)) && - writableFinished + (writableFinished || !isWritable(stream)) ) { process.nextTick(onclose); } else if ( !writable && (!willEmitClose || isWritable(stream)) && - readableFinished + (readableFinished || !isReadable(stream)) ) { process.nextTick(onclose); } else if ((rState && stream.req && stream.aborted)) {