From cda7ec939a31ccd58ada776b14844fe4609fa320 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 7 Jul 2021 11:33:55 +0200 Subject: [PATCH] stream: unify stream utils Unify stream helps into utils. --- lib/internal/streams/destroy.js | 22 ++-- lib/internal/streams/end-of-stream.js | 76 ++++--------- lib/internal/streams/pipeline.js | 8 +- lib/internal/streams/utils.js | 147 ++++++++++++++++++++++++-- test/parallel/test-stream-finished.js | 2 +- test/parallel/test-stream-pipeline.js | 2 +- 6 files changed, 174 insertions(+), 83 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index df2d9f7f71987a..4dbcf7ba488837 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -10,6 +10,11 @@ const { const { Symbol, } = primordials; +const { + kDestroyed, + isDestroyed, + isFinished, +} = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -364,8 +369,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -const kDestroyed = Symbol('kDestroyed'); - function emitCloseLegacy(stream) { stream.emit('close'); } @@ -375,25 +378,13 @@ function emitErrorCloseLegacy(stream, err) { process.nextTick(emitCloseLegacy, stream); } -function isDestroyed(stream) { - return stream.destroyed || stream[kDestroyed]; -} - -function isReadable(stream) { - return stream.readable && !stream.readableEnded && !isDestroyed(stream); -} - -function isWritable(stream) { - return stream.writable && !stream.writableEnded && !isDestroyed(stream); -} - // Normalize destroy for legacy. function destroyer(stream, err) { if (isDestroyed(stream)) { return; } - if (!err && (isReadable(stream) || isWritable(stream))) { + if (!err && !isFinished(stream)) { err = new AbortError(); } @@ -422,7 +413,6 @@ function destroyer(stream, err) { module.exports = { kDestroy, - isDestroyed, construct, destroyer, destroy, diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index efc2441c51ee39..89d80f19978bf3 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -17,48 +17,23 @@ const { validateObject, } = require('internal/validators'); +const { + isClosed, + isReadable, + isReadableStream, + isReadableEnded, + isWritable, + isWritableStream, + isWritableFinished, + willEmitClose: _willEmitClose, +} = require('internal/streams/utils'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } -function isServerResponse(stream) { - return ( - typeof stream._sent100 === 'boolean' && - typeof stream._removedConnection === 'boolean' && - typeof stream._removedContLen === 'boolean' && - typeof stream._removedTE === 'boolean' && - typeof stream._closed === 'boolean' - ); -} - -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - -function isWritableFinished(stream) { - if (stream.writableFinished) return true; - const wState = stream._writableState; - if (!wState || wState.errored) return false; - return wState.finished || (wState.ended && wState.length === 0); -} - const nop = () => {}; -function isReadableEnded(stream) { - if (stream.readableEnded) return true; - const rState = stream._readableState; - if (!rState || rState.errored) return false; - return rState.endEmitted || (rState.ended && rState.length === 0); -} - function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; @@ -74,13 +49,12 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadable(stream)); + (options.readable !== false && isReadableStream(stream)); const writable = options.writable || - (options.writable !== false && isWritable(stream)); + (options.writable !== false && isWritableStream(stream)); const wState = stream._writableState; const rState = stream._readableState; - const state = wState || rState; const onlegacyfinish = () => { if (!stream.writable) onfinish(); @@ -89,16 +63,13 @@ function eos(stream, options, callback) { // TODO (ronag): Improve soft detection to include core modules and // common ecosystem modules that do properly emit 'close' but fail // this generic check. - let willEmitClose = isServerResponse(stream) || ( - state && - state.autoDestroy && - state.emitClose && - state.closed === false && - isReadable(stream) === readable && - isWritable(stream) === writable + let willEmitClose = ( + _willEmitClose(stream) && + isReadableStream(stream) === readable && + isWritableStream(stream) === writable ); - let writableFinished = stream.writableFinished || wState?.finished; + let writableFinished = isWritableFinished(stream); const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -110,7 +81,7 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || rState?.endEmitted; + let readableEnded = isReadableEnded(stream); const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that @@ -126,7 +97,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = wState?.closed || rState?.closed; + let closed = isClosed(stream); const onclose = () => { closed = true; @@ -185,19 +156,16 @@ function eos(stream, options, callback) { } } else if ( !readable && - (!willEmitClose || stream.readable) && + (!willEmitClose || isReadable(stream)) && writableFinished ) { process.nextTick(onclose); } else if ( !writable && - (!willEmitClose || stream.writable) && + (!willEmitClose || isWritable(stream)) && readableEnded ) { process.nextTick(onclose); - } else if (!wState && !rState && stream._closed === true) { - // _closed is for OutgoingMessage which is not a proper Writable. - process.nextTick(onclose); } else if ((rState && stream.req && stream.aborted)) { process.nextTick(onclose); } diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 5759dbd4a580a3..ad08054408fa0e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,7 +27,7 @@ const { validateCallback } = require('internal/validators'); const { isIterable, - isReadable, + isReadableStream, isStream, } = require('internal/streams/utils'); @@ -87,7 +87,7 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadable(val)) { + } else if (isReadableStream(val)) { // Legacy streams are not Iterable. return fromReadable(val); } @@ -216,7 +216,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadable(stream)) { + } else if (isIterable(stream) || isReadableStream(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( @@ -272,7 +272,7 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret)) { + if (isReadableStream(ret)) { ret.pipe(stream); // Compat. Before node v10.12.0 stdio used to throw an error so diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 62eea022685e18..0695d75ffb75f6 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -1,22 +1,34 @@ 'use strict'; const { + Symbol, SymbolAsyncIterator, SymbolIterator, } = primordials; -function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function' && - typeof obj.on === 'function'); +const kDestroyed = Symbol('kDestroyed'); + +function isReadableStream(obj) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + (!obj._writableState || obj._readableState?.readable !== false) && // Duplex + (!obj._writableState || obj._readableState) // Writable has .pipe. + ); } -function isWritable(obj) { - return !!(obj && typeof obj.write === 'function' && - typeof obj.on === 'function'); +function isWritableStream(obj) { + return !!( + obj && + typeof obj.write === 'function' && + typeof obj.on === 'function' && + (!obj._readableState || obj._writableState?.writable !== false) // Duplex + ); } function isStream(obj) { - return isReadable(obj) || isWritable(obj); + return isReadableStream(obj) || isWritableStream(obj); } function isIterable(obj, isAsync) { @@ -27,8 +39,129 @@ function isIterable(obj, isAsync) { typeof obj[SymbolIterator] === 'function'; } +function isDestroyed(stream) { + if (!isStream(stream)) return null; + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed); +} + +function isWritableFinished(stream) { + if (!isStream(stream)) return null; + if (stream.writableFinished === true) return true; + const wState = stream._writableState; + if (!wState || wState.errored) return false; + return wState.finished || (wState.ended && wState.length === 0); +} + +function isReadableEnded(stream) { + if (!isStream(stream)) return null; + if (stream.readableEnded === true) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + return rState.endEmitted || (rState.ended && rState.length === 0); +} + +function isReadable(stream) { + const r = isReadableStream(stream); + if (r === null || typeof stream.readable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.readable && !isReadableEnded(stream); +} + +function isWritable(stream) { + const r = isWritableStream(stream); + if (r === null || typeof stream.writable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.writable && !isWritableFinished(stream); +} + +function isFinished(stream, opts) { + if (!isStream(stream)) { + return null; + } + + if (isDestroyed(stream)) { + return true; + } + + if (opts?.readable !== false && isReadable(stream)) { + return false; + } + + if (opts?.writable !== false && isWritable(stream)) { + return false; + } + + return true; +} + +function isClosed(stream) { + if (!isStream(stream)) { + return null; + } + + const wState = stream._writableState; + const rState = stream._readableState; + + if ( + typeof wState?.closed === 'boolean' || + typeof rState?.closed === 'boolean' + ) { + return wState?.closed || rState?.closed; + } + + if (stream._closed === 'boolean' && isOutgoingMessage(stream)) { + return stream._closed; + } + + return null; +} + +function isOutgoingMessage(stream) { + return ( + typeof stream._closed === 'boolean' && + typeof stream._defaultKeepAlive === 'boolean' && + typeof stream._removedConnection === 'boolean' && + typeof stream._removedContLen === 'boolean' + ); +} + +function isServerResponse(stream) { + return ( + typeof stream._sent100 === 'boolean' && + isOutgoingMessage(stream) + ); +} + +function willEmitClose(stream) { + if (!isStream(stream)) return null; + + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + + return (!state && isServerResponse(stream)) || !!( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); +} + module.exports = { + kDestroyed, + isClosed, + isDestroyed, + isFinished, isIterable, isReadable, + isReadableStream, + isReadableEnded, isStream, + isWritable, + isWritableStream, + isWritableFinished, + willEmitClose, }; diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 8e371911698336..d2207664eda40d 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); d._writableState = {}; d._writableState.finished = true; finished(d, { readable: false, writable: true }, common.mustCall((err) => { - assert.strictEqual(err, undefined); + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); })); d._writableState.errored = true; d.emit('close'); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..1f4474e6b5fce6 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1035,7 +1035,7 @@ const net = require('net'); const dst = new PassThrough(); dst.readable = false; pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, false); + assert.strictEqual(dst.destroyed, true); })); src.end(); }