From 4832a13c9a1321c56056322fc22425f92bbe48f5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 14 Aug 2019 21:32:09 +0200 Subject: [PATCH 1/3] stream: class readablestate --- lib/_stream_readable.js | 160 ++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 80 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1830eea66decc9..ee0e615f51c4a2 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -69,90 +69,90 @@ function prependListener(emitter, event, fn) { emitter._events[event] = [fn, emitter._events[event]]; } -function ReadableState(options, stream, isDuplex) { - options = options || {}; - - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - - // Object stream flag. Used to make read(n) ignore n and to - // make all the buffer merging and length checks go away - this.objectMode = !!options.objectMode; - - if (isDuplex) - this.objectMode = this.objectMode || !!options.readableObjectMode; - - // The point at which it stops calling _read() to fill the buffer - // Note: 0 is a valid value, means "don't call _read preemptively ever" - this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', - isDuplex); - - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift() - this.buffer = new BufferList(); - this.length = 0; - this.pipes = []; - this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true; - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; - this.paused = true; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = options.emitClose !== false; - - // Should .destroy() be called after 'end' (and potentially 'finish') - this.autoDestroy = !!options.autoDestroy; - - // Has it been destroyed - this.destroyed = false; - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = options.defaultEncoding || 'utf8'; - - // The number of writers that are awaiting a drain event in .pipe()s - this.awaitDrain = 0; - - // If true, a maybeReadMore has been scheduled - this.readingMore = false; - - this.decoder = null; - this.encoding = null; - if (options.encoding) { - if (!StringDecoder) - StringDecoder = require('string_decoder').StringDecoder; - this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; +class ReadableState { + constructor (options, stream, isDuplex) { + options = options || {}; + + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + if (typeof isDuplex !== 'boolean') + isDuplex = stream instanceof Stream.Duplex; + + // Object stream flag. Used to make read(n) ignore n and to + // make all the buffer merging and length checks go away + this.objectMode = !!options.objectMode; + + if (isDuplex) + this.objectMode = this.objectMode || !!options.readableObjectMode; + + // The point at which it stops calling _read() to fill the buffer + // Note: 0 is a valid value, means "don't call _read preemptively ever" + this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', + isDuplex); + + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift() + this.buffer = new BufferList(); + this.length = 0; + this.pipes = []; + this.flowing = null; + this.ended = false; + this.endEmitted = false; + this.reading = false; + + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + this.sync = true; + + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + this.needReadable = false; + this.emittedReadable = false; + this.readableListening = false; + this.resumeScheduled = false; + this.paused = true; + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + + // Should .destroy() be called after 'end' (and potentially 'finish') + this.autoDestroy = !!options.autoDestroy; + + // Has it been destroyed + this.destroyed = false; + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = options.defaultEncoding || 'utf8'; + + // The number of writers that are awaiting a drain event in .pipe()s + this.awaitDrain = 0; + + // If true, a maybeReadMore has been scheduled + this.readingMore = false; + + this.decoder = null; + this.encoding = null; + if (options.encoding) { + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this.decoder = new StringDecoder(options.encoding); + this.encoding = options.encoding; + } } -} -// Legacy getter for `pipesCount` -Object.defineProperty(ReadableState.prototype, 'pipesCount', { - get() { + // Legacy getter for `pipesCount` + get pipesCount () { return this.pipes.length; } -}); +} function Readable(options) { if (!(this instanceof Readable)) From 11c10a95a9f957ac1ed586cec5a482e1fa982013 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 14 Aug 2019 21:46:06 +0200 Subject: [PATCH 2/3] stream: readable bitfield --- lib/_stream_readable.js | 519 ++++++++++++++++++++++++++++++---------- 1 file changed, 386 insertions(+), 133 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ee0e615f51c4a2..3408101d97c7e0 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -24,7 +24,6 @@ const { Object } = primordials; module.exports = Readable; -Readable.ReadableState = ReadableState; const EE = require('events'); const Stream = require('stream'); @@ -69,8 +68,30 @@ function prependListener(emitter, event, fn) { emitter._events[event] = [fn, emitter._events[event]]; } +const awaitDrainMask = 0xFFFF; +const objectModeMask = 1 << 16; +const endedMask = 1 << 17; +const endEmittedMask = 1 << 18; +const readingMask = 1 << 19; +const syncMask = 1 << 20; +const needReadableMask = 1 << 21; +const emittedReadableMask = 1 << 22; +const readableListeningMask = 1 << 23; +const resumeScheduledMask = 1 << 24; +const pausedMask = 1 << 25; +const emitCloseMask = 1 << 26; +const autoDestroyMask = 1 << 27; +const destroyedMask = 1 << 28; +const readingMoreMask = 1 << 29; +const flowingNullMask = 1 << 30; +const flowingTrueMask = 1 << 31; + +const flowingMask = (flowingTrueMask | flowingNullMask); + +const kBitfield = Symbol('kBitfield'); + class ReadableState { - constructor (options, stream, isDuplex) { + constructor(options, stream, isDuplex) { options = options || {}; // Duplex streams are both readable and writable, but share @@ -81,78 +102,243 @@ class ReadableState { if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex; + this[kBitfield] = + syncMask | + pausedMask | + flowingNullMask | + (options.emitClose !== false ? emitCloseMask : 0) | + (options.autoDestroy ? autoDestroyMask : 0) | + (options.objectMode || (isDuplex && options.readableObjectMode) ? + objectModeMask : 0); + // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away - this.objectMode = !!options.objectMode; + // this.objectMode = !!options.objectMode; - if (isDuplex) - this.objectMode = this.objectMode || !!options.readableObjectMode; + // if (isDuplex) + // this.objectMode = this.objectMode || !!options.readableObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" - this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', - isDuplex); + this.highWaterMark = getHighWaterMark(this, options, + 'readableHighWaterMark', isDuplex); - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than + // A linked list is used to store data chunks instead of an array because + // the linked list can remove elements from the beginning faster than // array.shift() this.buffer = new BufferList(); this.length = 0; this.pipes = []; - this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; + // this.flowing = null; + // this.ended = false; + // this.endEmitted = false; + // this.reading = false; // A flag to be able to tell if the event 'readable'/'data' is emitted // immediately, or on a later tick. We set this to true at first, because // any actions that shouldn't happen until "later" should generally also // not happen before the first read call. - this.sync = true; + // this.sync = true; // Whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; - this.paused = true; + // this.needReadable = false; + // this.emittedReadable = false; + // this.readableListening = false; + // this.resumeScheduled = false; + // this.paused = true; // Should close be emitted on destroy. Defaults to true. - this.emitClose = options.emitClose !== false; + // this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish') - this.autoDestroy = !!options.autoDestroy; + // this.autoDestroy = !!options.autoDestroy; // Has it been destroyed - this.destroyed = false; + // this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = options.defaultEncoding || 'utf8'; + if (options.defaultEncoding) { + this.defaultEncoding = options.defaultEncoding; + } // The number of writers that are awaiting a drain event in .pipe()s - this.awaitDrain = 0; + // this.awaitDrain = 0; // If true, a maybeReadMore has been scheduled - this.readingMore = false; + // this.readingMore = false; - this.decoder = null; - this.encoding = null; if (options.encoding) { if (!StringDecoder) StringDecoder = require('string_decoder').StringDecoder; this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; + this.encoding = options.encoding || null; } } // Legacy getter for `pipesCount` - get pipesCount () { + get pipesCount() { return this.pipes.length; } + + // 0 === false + // 1 === null + // 2 === true + get flowing () { + const flowing = this[kBitfield] & flowingMask; + if (!flowing) { + return false; + } else if (flowing === flowingNullMask) { + return null; + } else { + return true; + } + } + set flowing (val) { + this[kBitfield] = this[kBitfield] & ~flowingMask; + if (val !== false) { + this[kBitfield] = this[kBitfield] | + (val ? flowingTrueMask : flowingNullMask); + } + } + + get awaitDrain () { + return this[kBitfield] & awaitDrainMask; + } + set awaitDrain (val) { + this[kBitfield] = (this[kBitfield] & ~awaitDrainMask) | + (val & awaitDrainMask); + } + + get objectMode() { + return Boolean(this[kBitfield] & objectModeMask); + } + set objectMode(val) { + this[kBitfield] = val ? + this[kBitfield] | objectModeMask : + this[kBitfield] & ~objectModeMask; + } + + get ended() { + return Boolean(this[kBitfield] & endedMask); + } + set ended(val) { + this[kBitfield] = val ? + this[kBitfield] | endedMask : + this[kBitfield] & ~endedMask; + } + + get endEmitted() { + return Boolean(this[kBitfield] & endEmittedMask); + } + set endEmitted(val) { + this[kBitfield] = val ? + this[kBitfield] | endEmittedMask : + this[kBitfield] & ~endEmittedMask; + } + + get reading() { + return Boolean(this[kBitfield] & readingMask); + } + set reading(val) { + this[kBitfield] = val ? + this[kBitfield] | readingMask : + this[kBitfield] & ~readingMask; + } + + get sync() { + return Boolean(this[kBitfield] & syncMask); + } + set sync(val) { + this[kBitfield] = val ? + this[kBitfield] | syncMask : + this[kBitfield] & ~syncMask; + } + + get needReadable() { + return Boolean(this[kBitfield] & needReadableMask); + } + set needReadable(val) { + this[kBitfield] = val ? + this[kBitfield] | needReadableMask : + this[kBitfield] & ~needReadableMask; + } + + get emittedReadable() { + return Boolean(this[kBitfield] & emittedReadableMask); + } + set emittedReadable(val) { + this[kBitfield] = val ? + this[kBitfield] | emittedReadableMask : + this[kBitfield] & ~emittedReadableMask; + } + + get readableListening() { + return Boolean(this[kBitfield] & readableListeningMask); + } + set readableListening(val) { + this[kBitfield] = val ? + this[kBitfield] | readableListeningMask : + this[kBitfield] & ~readableListeningMask; + } + + get resumeScheduled() { + return Boolean(this[kBitfield] & resumeScheduledMask); + } + set resumeScheduled(val) { + this[kBitfield] = val ? + this[kBitfield] | resumeScheduledMask : + this[kBitfield] & ~resumeScheduledMask; + } + + get paused() { + return Boolean(this[kBitfield] & pausedMask); + } + set paused(val) { + this[kBitfield] = val ? + this[kBitfield] | pausedMask : + this[kBitfield] & ~pausedMask; + } + + get emitClose() { + return Boolean(this[kBitfield] & emitCloseMask); + } + set emitClose(val) { + this[kBitfield] = val ? + this[kBitfield] | emitCloseMask : + this[kBitfield] & ~emitCloseMask; + } + + get autoDestroy() { + return Boolean(this[kBitfield] & autoDestroyMask); + } + set autoDestroy(val) { + this[kBitfield] = val ? + this[kBitfield] | autoDestroyMask : + this[kBitfield] & ~autoDestroyMask; + } + + get destroyed() { + return Boolean(this[kBitfield] & destroyedMask); + } + set destroyed(val) { + this[kBitfield] = val ? + this[kBitfield] | destroyedMask : + this[kBitfield] & ~destroyedMask; + } + + get readingMore() { + return Boolean(this[kBitfield] & readingMoreMask); + } + set readingMore(val) { + this[kBitfield] = val ? + this[kBitfield] | readingMoreMask : + this[kBitfield] & ~readingMoreMask; + } } +Readable.ReadableState = ReadableState; function Readable(options) { if (!(this instanceof Readable)) @@ -227,9 +413,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { let skipChunkCheck; - if (!state.objectMode) { + const objectMode = state[kBitfield] & objectModeMask; + + if (!objectMode) { if (typeof chunk === 'string') { - encoding = encoding || state.defaultEncoding; + encoding = encoding || state.defaultEncoding || 'utf8'; if (addToFront && state.encoding && state.encoding !== encoding) { // When unshifting, if state.encoding is set, we have to save // the string in the BufferList with the state encoding @@ -245,7 +433,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } if (chunk === null) { - state.reading = false; + // state.reading = false; + state[kBitfield] = state[kBitfield] & ~readingMask; onEofChunk(stream, state); } else { var er; @@ -253,28 +442,29 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { er = chunkInvalid(state, chunk); if (er) { errorOrDestroy(stream, er); - } else if (state.objectMode || chunk && chunk.length > 0) { + } else if (objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && - !state.objectMode && + !objectMode && // Do not use Object.getPrototypeOf as it is slower since V8 7.3. !(chunk instanceof Buffer)) { chunk = Stream._uint8ArrayToBuffer(chunk); } if (addToFront) { - if (state.endEmitted) + if (state[kBitfield] & endEmittedMask) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else addChunk(stream, state, chunk, true); - } else if (state.ended) { + } else if (state[kBitfield] & endedMask) { errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed) { + } else if (state[kBitfield] & destroyedMask) { return false; } else { - state.reading = false; + // state.reading = false; + state[kBitfield] = state[kBitfield] & ~readingMask; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); - if (state.objectMode || chunk.length !== 0) + if (objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false); else maybeReadMore(stream, state); @@ -283,7 +473,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.reading = false; + // state.reading = false; + state[kBitfield] = state[kBitfield] & ~readingMask; maybeReadMore(stream, state); } } @@ -291,23 +482,25 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { // We can push more data if we are below the highWaterMark. // Also, if we have no data yet, we can stand some more bytes. // This is to work around cases where hwm=0, such as the repl. - return !state.ended && + return !(state[kBitfield] & endedMask) && (state.length < state.highWaterMark || state.length === 0); } function addChunk(stream, state, chunk, addToFront) { - if (state.flowing && state.length === 0 && !state.sync) { - state.awaitDrain = 0; + if ((state[kBitfield] & flowingTrueMask) && state.length === 0 && + !(state[kBitfield] & syncMask)) { + // state.awaitDrain = 0; + state[kBitfield] = state[kBitfield] & ~awaitDrainMask; stream.emit('data', chunk); } else { // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; + state.length += (state[kBitfield] & objectModeMask) ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); - if (state.needReadable) + if (state[kBitfield] & needReadableMask) emitReadable(stream); } maybeReadMore(stream, state); @@ -317,7 +510,7 @@ function chunkInvalid(state, chunk) { if (!Stream._isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && - !state.objectMode) { + !(state[kBitfield] & objectModeMask)) { return new ERR_INVALID_ARG_TYPE( 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } @@ -325,7 +518,7 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + return !(this._readableState[kBitfield] & flowingMask); }; // Backwards compatibility. @@ -372,13 +565,13 @@ function computeNewHighWaterMark(n) { // This function is designed to be inlinable, so please take care when making // changes to the function body. function howMuchToRead(n, state) { - if (n <= 0 || (state.length === 0 && state.ended)) + if (n <= 0 || (state.length === 0 && (state[kBitfield] & endedMask))) return 0; - if (state.objectMode) + if (state[kBitfield] & objectModeMask) return 1; if (Number.isNaN(n)) { // Only flow one buffer at a time - if (state.flowing && state.length) + if ((state[kBitfield] & flowingTrueMask) && state.length) return state.buffer.first().length; else return state.length; @@ -389,8 +582,9 @@ function howMuchToRead(n, state) { if (n <= state.length) return n; // Don't have enough - if (!state.ended) { - state.needReadable = true; + if (!(state[kBitfield] & endedMask)) { + // state.needReadable = true; + state[kBitfield] = state[kBitfield] | needReadableMask; return 0; } return state.length; @@ -407,22 +601,25 @@ Readable.prototype.read = function(n) { n = parseInt(n, 10); } const state = this._readableState; + const nOrig = n; - if (n !== 0) - state.emittedReadable = false; + if (n !== 0) { + // state.emittedReadable = false; + state[kBitfield] = state[kBitfield] & ~emittedReadableMask; + } // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger // the 'readable' event and move on. if (n === 0 && - state.needReadable && + (state[kBitfield] & needReadableMask) && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || - state.ended)) { + (state[kBitfield] & endedMask))) { debug('read: emitReadable', state.length, state.ended); - if (state.length === 0 && state.ended) + if (state.length === 0 && (state[kBitfield] & endedMask)) endReadable(this); else emitReadable(this); @@ -432,7 +629,7 @@ Readable.prototype.read = function(n) { n = howMuchToRead(n, state); // If we've ended, and we're now clear, then finish it up. - if (n === 0 && state.ended) { + if (n === 0 && (state[kBitfield] & endedMask)) { if (state.length === 0) endReadable(this); return null; @@ -461,7 +658,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - var doRead = state.needReadable; + let doRead = state[kBitfield] & needReadableMask; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some @@ -472,23 +669,30 @@ Readable.prototype.read = function(n) { // However, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state.ended || state.reading) { + if (state[kBitfield] & (endedMask | readingMask)) { doRead = false; debug('reading or ended', doRead); } else if (doRead) { debug('do read'); - state.reading = true; - state.sync = true; + // state.reading = true; + // state.sync = true; // If the length is currently zero, then we *need* a readable event. - if (state.length === 0) - state.needReadable = true; + // if (state.length === 0) + // state.needReadable = true; + state[kBitfield] = state[kBitfield] | + readingMask | syncMask | + (state.length === 0 ? needReadableMask : 0); // Call internal read method this._read(state.highWaterMark); - state.sync = false; + + // state.sync = false; + state[kBitfield] = state[kBitfield] & ~syncMask; + // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. - if (!state.reading) + if (!(state[kBitfield] & readingMask)) { n = howMuchToRead(nOrig, state); + } } var ret; @@ -498,22 +702,25 @@ Readable.prototype.read = function(n) { ret = null; if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; + // state.needReadable = state.length <= state.highWaterMark; + state[kBitfield] = (state[kBitfield] & ~needReadableMask) | + (state.length <= state.highWaterMark ? needReadableMask : 0); n = 0; } else { state.length -= n; - state.awaitDrain = 0; + // state.awaitDrain = 0; + state[kBitfield] = state[kBitfield] & ~awaitDrainMask; } if (state.length === 0) { // If we have nothing in the buffer, then we want to know // as soon as we *do* get something into the buffer. - if (!state.ended) - state.needReadable = true; - + if (!(state[kBitfield] & endedMask)) + state[kBitfield] = state[kBitfield] | needReadableMask; // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended) + else if (nOrig !== n) { endReadable(this); + } } if (ret !== null) @@ -524,25 +731,30 @@ Readable.prototype.read = function(n) { function onEofChunk(stream, state) { debug('onEofChunk'); - if (state.ended) return; + + if (state[kBitfield] & endedMask) return; if (state.decoder) { var chunk = state.decoder.end(); if (chunk && chunk.length) { state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; + state.length += (state[kBitfield] & objectModeMask) ? 1 : chunk.length; } } - state.ended = true; - if (state.sync) { + // state.ended = true; + state[kBitfield] = state[kBitfield] | endedMask; + + if (state[kBitfield] & syncMask) { // If we are sync, wait until next tick to emit the data. // Otherwise we risk emitting data in the flow() // the readable code triggers during a read() call emitReadable(stream); } else { // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - state.emittedReadable = true; + // state.needReadable = false; + // state.emittedReadable = true; + state[kBitfield] = (state[kBitfield] & ~needReadableMask) + | emittedReadableMask; // We have to emit readable now that we are EOF. Modules // in the ecosystem (e.g. dicer) rely on this event being sync. emitReadable_(stream); @@ -555,20 +767,28 @@ function onEofChunk(stream, state) { function emitReadable(stream) { const state = stream._readableState; debug('emitReadable', state.needReadable, state.emittedReadable); - state.needReadable = false; - if (!state.emittedReadable) { + // state.needReadable = false; + state[kBitfield] = state[kBitfield] & ~needReadableMask; + if (!(state[kBitfield] & emittedReadableMask)) { debug('emitReadable', state.flowing); - state.emittedReadable = true; + // state.emittedReadable = true; + state[kBitfield] = state[kBitfield] | emittedReadableMask; process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { const state = stream._readableState; + + const destroyed = state[kBitfield] & destroyedMask; + const ended = state[kBitfield] & endedMask; + const flowing = state[kBitfield] & flowingTrueMask; + debug('emitReadable_', state.destroyed, state.length, state.ended); - if (!state.destroyed && (state.length || state.ended)) { + if (!destroyed && (ended || state.length)) { stream.emit('readable'); - state.emittedReadable = false; + // state.emittedReadable = false; + state[kBitfield] = state[kBitfield] & ~emittedReadableMask; } // The stream needs another readable event if @@ -577,10 +797,10 @@ function emitReadable_(stream) { // 2. It is not ended. // 3. It is below the highWaterMark, so we can schedule // another readable later. - state.needReadable = - !state.flowing && - !state.ended && + const needReadable = !flowing && !ended && state.length <= state.highWaterMark; + // state.emittedReadable = needReadable; + state[kBitfield] = state[kBitfield] | (needReadable ? needReadableMask : 0); flow(stream); } @@ -592,8 +812,9 @@ function emitReadable_(stream) { // However, if we're not ended, or reading, and the length < hwm, // then go ahead and try to read some more preemptively. function maybeReadMore(stream, state) { - if (!state.readingMore) { - state.readingMore = true; + if (!(state[kBitfield] & readingMoreMask)) { + // state.readingMore = readingMore; + state[kBitfield] = state[kBitfield] | readingMoreMask; process.nextTick(maybeReadMore_, stream, state); } } @@ -622,9 +843,9 @@ function maybeReadMore_(stream, state) { // called push() with new data. In this case we skip performing more // read()s. The execution ends in this method again after the _read() ends // up calling push() with more data. - while (!state.reading && !state.ended && + while (!(state[kBitfield] & (readingMask | endedMask)) && (state.length < state.highWaterMark || - (state.flowing && state.length === 0))) { + ((state[kBitfield] & flowingTrueMask) && state.length === 0))) { const len = state.length; debug('maybeReadMore read 0'); stream.read(0); @@ -632,7 +853,8 @@ function maybeReadMore_(stream, state) { // Didn't get any data, stop spinning. break; } - state.readingMore = false; + // state.readingMore = false; + state[kBitfield] = state[kBitfield] & ~readingMoreMask; } // Abstract method. to be overridden in specific implementation classes. @@ -655,7 +877,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stderr; const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) + if (state[kBitfield] & endEmittedMask) process.nextTick(endFn); else src.once('end', endFn); @@ -703,7 +925,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (state.awaitDrain && + if ((state[kBitfield] & awaitDrainMask) && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } @@ -720,7 +942,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // => Check whether `dest` is still a piping destination. if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { debug('false write response, pause', state.awaitDrain); - state.awaitDrain++; + + // state.awaitDrain++; + state[kBitfield]++; } src.pause(); } @@ -761,7 +985,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.emit('pipe', src); // Start the flow if it hasn't been started already. - if (!state.flowing) { + if (!(state[kBitfield] & flowingTrueMask)) { debug('pipe resume'); src.resume(); } @@ -772,11 +996,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) { function pipeOnDrain(src) { return function pipeOnDrainFunctionResult() { const state = src._readableState; + debug('pipeOnDrain', state.awaitDrain); - if (state.awaitDrain) - state.awaitDrain--; - if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { - state.flowing = true; + if (state[kBitfield] & awaitDrainMask) { + // state.awaitDrain--; + state[kBitfield]--; + } + + if (!(state[kBitfield] & awaitDrainMask) && EE.listenerCount(src, 'data')) { + // state.flowing = true; + state[kBitfield] = state[kBitfield] | flowingTrueMask; flow(src); } }; @@ -795,7 +1024,7 @@ Readable.prototype.unpipe = function(dest) { // remove all. var dests = state.pipes; state.pipes = []; - state.flowing = false; + state[kBitfield] = state[kBitfield] & ~flowingMask; for (var i = 0; i < dests.length; i++) dests[i].emit('unpipe', this, { hasUnpiped: false }); @@ -809,7 +1038,7 @@ Readable.prototype.unpipe = function(dest) { state.pipes.splice(index, 1); if (state.pipes.length === 0) - state.flowing = false; + state[kBitfield] = state[kBitfield] & ~flowingMask; dest.emit('unpipe', this, unpipeInfo); @@ -825,20 +1054,25 @@ Readable.prototype.on = function(ev, fn) { if (ev === 'data') { // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). - state.readableListening = this.listenerCount('readable') > 0; + // state.readableListening = this.listenerCount('readable') > 0; + state[kBitfield] = (state[kBitfield] & ~readableListeningMask) | + (this.listenerCount('readable') > 0 ? readableListeningMask : 0); // Try start flowing on next tick if stream isn't explicitly paused - if (state.flowing !== false) + if (state[kBitfield] & flowingMask) this.resume(); } else if (ev === 'readable') { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; + if (!(state[kBitfield] & (endEmittedMask | readableListeningMask))) { + // state.readableListening = state.needReadable = true; + // state.flowing = false; + // state.emittedReadable = false; + state[kBitfield] = (state[kBitfield] & + ~(emittedReadableMask | flowingMask)) | + readableListeningMask | needReadableMask; debug('on readable', state.length, state.reading); if (state.length) { emitReadable(this); - } else if (!state.reading) { + } else if (!(state[kBitfield] & readingMask)) { process.nextTick(nReadingNextTick, this); } } @@ -882,12 +1116,17 @@ Readable.prototype.removeAllListeners = function(ev) { function updateReadableListening(self) { const state = self._readableState; - state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && !state.paused) { + // state.readableListening = self.listenerCount('readable') > 0; + state[kBitfield] = (state[kBitfield] & ~readableListeningMask) | + (self.listenerCount('readable') > 0 ? readableListeningMask : 0); + + if ((state[kBitfield] & resumeScheduledMask) && + !(state[kBitfield] & pausedMask)) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. - state.flowing = true; + // state.flowing = true; + state[kBitfield] = state[kBitfield] | flowingTrueMask; // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { @@ -904,53 +1143,64 @@ function nReadingNextTick(self) { // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { const state = this._readableState; - if (!state.flowing) { + + if (!(state[kBitfield] & flowingTrueMask)) { debug('resume'); // We flow only if there is no one listening // for readable, but we still have to call // resume() - state.flowing = !state.readableListening; + const readableListening = state[kBitfield] & readableListeningMask; + // state.flowing = !readableListening; + state[kBitfield] = (state[kBitfield] & ~flowingMask) | + (!readableListening ? flowingTrueMask : 0); resume(this, state); } - state.paused = false; + // state.paused = false; + state[kBitfield] = state[kBitfield] & ~pausedMask; return this; }; function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; + if (!(state[kBitfield] & resumeScheduledMask)) { + // state.resumeScheduled = true; + state[kBitfield] = state[kBitfield] | resumeScheduledMask; process.nextTick(resume_, stream, state); } } function resume_(stream, state) { debug('resume', state.reading); - if (!state.reading) { + if (!(state[kBitfield] & readingMask)) { stream.read(0); } - state.resumeScheduled = false; + // state.resumeScheduled = false; + state[kBitfield] = state[kBitfield] & ~resumeScheduledMask; stream.emit('resume'); flow(stream); - if (state.flowing && !state.reading) + if ((state[kBitfield] & flowingTrueMask) && + !(state[kBitfield] & readingMask)) stream.read(0); } Readable.prototype.pause = function() { - debug('call pause flowing=%j', this._readableState.flowing); - if (this._readableState.flowing !== false) { + const state = this._readableState; + debug('call pause flowing=%j', state.flowing); + if (state[kBitfield] & flowingMask) { debug('pause'); - this._readableState.flowing = false; + // state.flowing = false; + state[kBitfield] = state[kBitfield] & ~flowingMask; this.emit('pause'); } - this._readableState.paused = true; + // state.paused = true; + state[kBitfield] = state[kBitfield] | pausedMask; return this; }; function flow(stream) { const state = stream._readableState; debug('flow', state.flowing); - while (state.flowing && stream.read() !== null); + while ((state[kBitfield] & flowingTrueMask) && stream.read() !== null); } // Wrap an old-style stream as the async data source. @@ -962,7 +1212,7 @@ Readable.prototype.wrap = function(stream) { stream.on('end', () => { debug('wrapped end'); - if (state.decoder && !state.ended) { + if (state.decoder && !(state[kBitfield] & endedMask)) { var chunk = state.decoder.end(); if (chunk && chunk.length) this.push(chunk); @@ -976,10 +1226,11 @@ Readable.prototype.wrap = function(stream) { if (state.decoder) chunk = state.decoder.write(chunk); + const objectMode = state[kBitfield] & objectModeMask; // Don't skip over falsy values in objectMode - if (state.objectMode && (chunk === null || chunk === undefined)) + if (objectMode && (chunk === null || chunk === undefined)) return; - else if (!state.objectMode && (!chunk || !chunk.length)) + else if (!objectMode && (!chunk || !chunk.length)) return; const ret = this.push(chunk); @@ -1098,7 +1349,7 @@ function fromList(n, state) { return null; var ret; - if (state.objectMode) + if (state[kBitfield] & objectModeMask) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Read it all, truncate the list @@ -1121,8 +1372,9 @@ function endReadable(stream) { const state = stream._readableState; debug('endReadable', state.endEmitted); - if (!state.endEmitted) { - state.ended = true; + if (!(state[kBitfield] & endEmittedMask)) { + // state.ended = true; + state[kBitfield] = state[kBitfield] | endedMask; process.nextTick(endReadableNT, state, stream); } } @@ -1131,12 +1383,13 @@ function endReadableNT(state, stream) { debug('endReadableNT', state.endEmitted, state.length); // Check that we didn't get one last unshift. - if (!state.endEmitted && state.length === 0) { - state.endEmitted = true; + if (!(state[kBitfield] & endEmittedMask) && state.length === 0) { + // state.endEmitted = true; + state[kBitfield] = state[kBitfield] | endEmittedMask; stream.readable = false; stream.emit('end'); - if (state.autoDestroy) { + if (state[kBitfield] & autoDestroyMask) { // In case of duplex streams we need a way to detect // if the writable side is ready for autoDestroy as well const wState = stream._writableState; From bbb518b3fc3edaa05408a2c98d07b3c273a90269 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 16 Aug 2019 09:50:47 +0200 Subject: [PATCH 3/3] stream: use local bitfield --- lib/_stream_readable.js | 495 +++++++++++++++++++++------------------- 1 file changed, 255 insertions(+), 240 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 3408101d97c7e0..a66ab96905833b 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -49,6 +49,25 @@ Object.setPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; +const kBitfield = Symbol('kBitfield'); +const kAwaitDrainMask = 0xFFFF; +const kObjectModeMask = 1 << 16; +const kEndedMask = 1 << 17; +const kEndEmittedMask = 1 << 18; +const kReadingMask = 1 << 19; +const kSyncMask = 1 << 20; +const kNeedReadableMask = 1 << 21; +const kEmittedReadableMask = 1 << 22; +const kReadableListeningMask = 1 << 23; +const kResumeScheduledMask = 1 << 24; +const kPausedMask = 1 << 25; +const kEmitCloseMask = 1 << 26; +const kAutoDestroyMask = 1 << 27; +const kDestroyedMask = 1 << 28; +const kReadingMoreMask = 1 << 29; +const kFlowingNullMask = 1 << 30; +const kFlowingTrueMask = 1 << 31; +const kFlowingMask = (kFlowingTrueMask | kFlowingNullMask); function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -68,28 +87,6 @@ function prependListener(emitter, event, fn) { emitter._events[event] = [fn, emitter._events[event]]; } -const awaitDrainMask = 0xFFFF; -const objectModeMask = 1 << 16; -const endedMask = 1 << 17; -const endEmittedMask = 1 << 18; -const readingMask = 1 << 19; -const syncMask = 1 << 20; -const needReadableMask = 1 << 21; -const emittedReadableMask = 1 << 22; -const readableListeningMask = 1 << 23; -const resumeScheduledMask = 1 << 24; -const pausedMask = 1 << 25; -const emitCloseMask = 1 << 26; -const autoDestroyMask = 1 << 27; -const destroyedMask = 1 << 28; -const readingMoreMask = 1 << 29; -const flowingNullMask = 1 << 30; -const flowingTrueMask = 1 << 31; - -const flowingMask = (flowingTrueMask | flowingNullMask); - -const kBitfield = Symbol('kBitfield'); - class ReadableState { constructor(options, stream, isDuplex) { options = options || {}; @@ -103,13 +100,13 @@ class ReadableState { isDuplex = stream instanceof Stream.Duplex; this[kBitfield] = - syncMask | - pausedMask | - flowingNullMask | - (options.emitClose !== false ? emitCloseMask : 0) | - (options.autoDestroy ? autoDestroyMask : 0) | + kSyncMask | + kPausedMask | + kFlowingNullMask | + (options.emitClose !== false ? kEmitCloseMask : 0) | + (options.autoDestroy ? kAutoDestroyMask : 0) | (options.objectMode || (isDuplex && options.readableObjectMode) ? - objectModeMask : 0); + kObjectModeMask : 0); // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away @@ -186,156 +183,156 @@ class ReadableState { // 0 === false // 1 === null // 2 === true - get flowing () { - const flowing = this[kBitfield] & flowingMask; + get flowing() { + const flowing = this[kBitfield] & kFlowingMask; if (!flowing) { return false; - } else if (flowing === flowingNullMask) { + } else if (flowing === kFlowingNullMask) { return null; } else { return true; } } - set flowing (val) { - this[kBitfield] = this[kBitfield] & ~flowingMask; + set flowing(val) { + this[kBitfield] = this[kBitfield] & ~kFlowingMask; if (val !== false) { this[kBitfield] = this[kBitfield] | - (val ? flowingTrueMask : flowingNullMask); + (val ? kFlowingTrueMask : kFlowingNullMask); } } - get awaitDrain () { - return this[kBitfield] & awaitDrainMask; + get awaitDrain() { + return this[kBitfield] & kAwaitDrainMask; } - set awaitDrain (val) { - this[kBitfield] = (this[kBitfield] & ~awaitDrainMask) | - (val & awaitDrainMask); + set awaitDrain(val) { + this[kBitfield] = (this[kBitfield] & ~kAwaitDrainMask) | + (val & kAwaitDrainMask); } get objectMode() { - return Boolean(this[kBitfield] & objectModeMask); + return Boolean(this[kBitfield] & kObjectModeMask); } set objectMode(val) { this[kBitfield] = val ? - this[kBitfield] | objectModeMask : - this[kBitfield] & ~objectModeMask; + this[kBitfield] | kObjectModeMask : + this[kBitfield] & ~kObjectModeMask; } get ended() { - return Boolean(this[kBitfield] & endedMask); + return Boolean(this[kBitfield] & kEndedMask); } set ended(val) { this[kBitfield] = val ? - this[kBitfield] | endedMask : - this[kBitfield] & ~endedMask; + this[kBitfield] | kEndedMask : + this[kBitfield] & ~kEndedMask; } get endEmitted() { - return Boolean(this[kBitfield] & endEmittedMask); + return Boolean(this[kBitfield] & kEndEmittedMask); } set endEmitted(val) { this[kBitfield] = val ? - this[kBitfield] | endEmittedMask : - this[kBitfield] & ~endEmittedMask; + this[kBitfield] | kEndEmittedMask : + this[kBitfield] & ~kEndEmittedMask; } get reading() { - return Boolean(this[kBitfield] & readingMask); + return Boolean(this[kBitfield] & kReadingMask); } set reading(val) { this[kBitfield] = val ? - this[kBitfield] | readingMask : - this[kBitfield] & ~readingMask; + this[kBitfield] | kReadingMask : + this[kBitfield] & ~kReadingMask; } get sync() { - return Boolean(this[kBitfield] & syncMask); + return Boolean(this[kBitfield] & kSyncMask); } set sync(val) { this[kBitfield] = val ? - this[kBitfield] | syncMask : - this[kBitfield] & ~syncMask; + this[kBitfield] | kSyncMask : + this[kBitfield] & ~kSyncMask; } get needReadable() { - return Boolean(this[kBitfield] & needReadableMask); + return Boolean(this[kBitfield] & kNeedReadableMask); } set needReadable(val) { this[kBitfield] = val ? - this[kBitfield] | needReadableMask : - this[kBitfield] & ~needReadableMask; + this[kBitfield] | kNeedReadableMask : + this[kBitfield] & ~kNeedReadableMask; } get emittedReadable() { - return Boolean(this[kBitfield] & emittedReadableMask); + return Boolean(this[kBitfield] & kEmittedReadableMask); } set emittedReadable(val) { this[kBitfield] = val ? - this[kBitfield] | emittedReadableMask : - this[kBitfield] & ~emittedReadableMask; + this[kBitfield] | kEmittedReadableMask : + this[kBitfield] & ~kEmittedReadableMask; } get readableListening() { - return Boolean(this[kBitfield] & readableListeningMask); + return Boolean(this[kBitfield] & kReadableListeningMask); } set readableListening(val) { this[kBitfield] = val ? - this[kBitfield] | readableListeningMask : - this[kBitfield] & ~readableListeningMask; + this[kBitfield] | kReadableListeningMask : + this[kBitfield] & ~kReadableListeningMask; } get resumeScheduled() { - return Boolean(this[kBitfield] & resumeScheduledMask); + return Boolean(this[kBitfield] & kResumeScheduledMask); } set resumeScheduled(val) { this[kBitfield] = val ? - this[kBitfield] | resumeScheduledMask : - this[kBitfield] & ~resumeScheduledMask; + this[kBitfield] | kResumeScheduledMask : + this[kBitfield] & ~kResumeScheduledMask; } get paused() { - return Boolean(this[kBitfield] & pausedMask); + return Boolean(this[kBitfield] & kPausedMask); } set paused(val) { this[kBitfield] = val ? - this[kBitfield] | pausedMask : - this[kBitfield] & ~pausedMask; + this[kBitfield] | kPausedMask : + this[kBitfield] & ~kPausedMask; } get emitClose() { - return Boolean(this[kBitfield] & emitCloseMask); + return Boolean(this[kBitfield] & kEmitCloseMask); } set emitClose(val) { this[kBitfield] = val ? - this[kBitfield] | emitCloseMask : - this[kBitfield] & ~emitCloseMask; + this[kBitfield] | kEmitCloseMask : + this[kBitfield] & ~kEmitCloseMask; } get autoDestroy() { - return Boolean(this[kBitfield] & autoDestroyMask); + return Boolean(this[kBitfield] & kAutoDestroyMask); } set autoDestroy(val) { this[kBitfield] = val ? - this[kBitfield] | autoDestroyMask : - this[kBitfield] & ~autoDestroyMask; + this[kBitfield] | kAutoDestroyMask : + this[kBitfield] & ~kAutoDestroyMask; } get destroyed() { - return Boolean(this[kBitfield] & destroyedMask); + return Boolean(this[kBitfield] & kDestroyedMask); } set destroyed(val) { this[kBitfield] = val ? - this[kBitfield] | destroyedMask : - this[kBitfield] & ~destroyedMask; + this[kBitfield] | kDestroyedMask : + this[kBitfield] & ~kDestroyedMask; } get readingMore() { - return Boolean(this[kBitfield] & readingMoreMask); + return Boolean(this[kBitfield] & kReadingMoreMask); } set readingMore(val) { this[kBitfield] = val ? - this[kBitfield] | readingMoreMask : - this[kBitfield] & ~readingMoreMask; + this[kBitfield] | kReadingMoreMask : + this[kBitfield] & ~kReadingMoreMask; } } Readable.ReadableState = ReadableState; @@ -413,7 +410,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { let skipChunkCheck; - const objectMode = state[kBitfield] & objectModeMask; + let bitfield = state[kBitfield]; + const objectMode = bitfield & kObjectModeMask; if (!objectMode) { if (typeof chunk === 'string') { @@ -434,14 +432,15 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (chunk === null) { // state.reading = false; - state[kBitfield] = state[kBitfield] & ~readingMask; - onEofChunk(stream, state); + bitfield = bitfield & ~kReadingMask; + bitfield = onEofChunk(stream, state, bitfield); } else { var er; if (!skipChunkCheck) - er = chunkInvalid(state, chunk); + er = chunkInvalid(chunk, objectMode); if (er) { errorOrDestroy(stream, er); + bitfield = state[kBitfield]; } else if (objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !objectMode && @@ -451,66 +450,72 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } if (addToFront) { - if (state[kBitfield] & endEmittedMask) + if (bitfield & kEndEmittedMask) { errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - else - addChunk(stream, state, chunk, true); - } else if (state[kBitfield] & endedMask) { + bitfield = state[kBitfield]; + } else { + bitfield = addChunk(stream, state, chunk, true, bitfield); + } + } else if (bitfield & kEndedMask) { errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state[kBitfield] & destroyedMask) { + bitfield = state[kBitfield]; + } else if (bitfield & kDestroyedMask) { return false; } else { // state.reading = false; - state[kBitfield] = state[kBitfield] & ~readingMask; + bitfield = bitfield & ~kReadingMask; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (objectMode || chunk.length !== 0) - addChunk(stream, state, chunk, false); + bitfield = addChunk(stream, state, chunk, false, bitfield); else - maybeReadMore(stream, state); + bitfield = maybeReadMore(stream, state, bitfield); } else { - addChunk(stream, state, chunk, false); + bitfield = addChunk(stream, state, chunk, false, bitfield); } } } else if (!addToFront) { // state.reading = false; - state[kBitfield] = state[kBitfield] & ~readingMask; - maybeReadMore(stream, state); + bitfield = bitfield & ~kReadingMask; + bitfield = maybeReadMore(stream, state, bitfield); } } + state[kBitfield] = bitfield; + // We can push more data if we are below the highWaterMark. // Also, if we have no data yet, we can stand some more bytes. // This is to work around cases where hwm=0, such as the repl. - return !(state[kBitfield] & endedMask) && + return !(bitfield & kEndedMask) && (state.length < state.highWaterMark || state.length === 0); } -function addChunk(stream, state, chunk, addToFront) { - if ((state[kBitfield] & flowingTrueMask) && state.length === 0 && - !(state[kBitfield] & syncMask)) { +function addChunk(stream, state, chunk, addToFront, bitfield) { + if ((bitfield & kFlowingTrueMask) && state.length === 0 && + !(bitfield & kSyncMask)) { // state.awaitDrain = 0; - state[kBitfield] = state[kBitfield] & ~awaitDrainMask; + state[kBitfield] = bitfield & ~kAwaitDrainMask; stream.emit('data', chunk); + bitfield = state[kBitfield]; } else { // Update the buffer info. - state.length += (state[kBitfield] & objectModeMask) ? 1 : chunk.length; + state.length += (bitfield & kObjectModeMask) ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); - if (state[kBitfield] & needReadableMask) - emitReadable(stream); + if (bitfield & kNeedReadableMask) + bitfield = emitReadable(stream, bitfield); } - maybeReadMore(stream, state); + return maybeReadMore(stream, state, bitfield); } -function chunkInvalid(state, chunk) { +function chunkInvalid(chunk, objectMode) { if (!Stream._isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && - !(state[kBitfield] & objectModeMask)) { + !objectMode) { return new ERR_INVALID_ARG_TYPE( 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } @@ -518,7 +523,7 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return !(this._readableState[kBitfield] & flowingMask); + return !(this._readableState[kBitfield] & kFlowingMask); }; // Backwards compatibility. @@ -564,30 +569,21 @@ function computeNewHighWaterMark(n) { // This function is designed to be inlinable, so please take care when making // changes to the function body. -function howMuchToRead(n, state) { - if (n <= 0 || (state.length === 0 && (state[kBitfield] & endedMask))) +function howMuchToRead(n, state, bitfield) { + if (n <= 0 || (state.length === 0 && (bitfield & kEndedMask))) return 0; - if (state[kBitfield] & objectModeMask) + if (bitfield & kObjectModeMask) return 1; if (Number.isNaN(n)) { // Only flow one buffer at a time - if ((state[kBitfield] & flowingTrueMask) && state.length) + if ((bitfield & kFlowingTrueMask) && state.length) return state.buffer.first().length; else return state.length; } - // If we're asking for more than the current hwm, then raise the hwm. - if (n > state.highWaterMark) - state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; - // Don't have enough - if (!(state[kBitfield] & endedMask)) { - // state.needReadable = true; - state[kBitfield] = state[kBitfield] | needReadableMask; - return 0; - } - return state.length; + return (bitfield & kEndedMask) ? state.length : 0; } // You can override either this method, or the async _read(n) below. @@ -602,36 +598,43 @@ Readable.prototype.read = function(n) { } const state = this._readableState; + // If we're asking for more than the current hwm, then raise the hwm. + if (n > state.highWaterMark) + state.highWaterMark = computeNewHighWaterMark(n); + + let bitfield = state[kBitfield]; + const nOrig = n; if (n !== 0) { // state.emittedReadable = false; - state[kBitfield] = state[kBitfield] & ~emittedReadableMask; + bitfield = bitfield & ~kEmittedReadableMask; } // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger // the 'readable' event and move on. if (n === 0 && - (state[kBitfield] & needReadableMask) && + (bitfield & kNeedReadableMask) && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || - (state[kBitfield] & endedMask))) { - debug('read: emitReadable', state.length, state.ended); - if (state.length === 0 && (state[kBitfield] & endedMask)) - endReadable(this); + (bitfield & kEndedMask))) { + debug('read: emitReadable', state.length, bitfield & kEndedMask); + if (state.length === 0 && (bitfield & kEndedMask)) + state[kBitfield] = endReadable(this, bitfield); else - emitReadable(this); + state[kBitfield] = emitReadable(this, bitfield); return null; } - n = howMuchToRead(n, state); + n = howMuchToRead(n, state, bitfield); // If we've ended, and we're now clear, then finish it up. - if (n === 0 && (state[kBitfield] & endedMask)) { - if (state.length === 0) - endReadable(this); + if (n === 0 && (bitfield & kEndedMask)) { + if (state.length === 0) { + state[kBitfield] = endReadable(this, bitfield); + } return null; } @@ -658,7 +661,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = state[kBitfield] & needReadableMask; + let doRead = bitfield & kNeedReadableMask; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some @@ -669,7 +672,7 @@ Readable.prototype.read = function(n) { // However, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state[kBitfield] & (endedMask | readingMask)) { + if (bitfield & (kEndedMask | kReadingMask)) { doRead = false; debug('reading or ended', doRead); } else if (doRead) { @@ -679,116 +682,123 @@ Readable.prototype.read = function(n) { // If the length is currently zero, then we *need* a readable event. // if (state.length === 0) // state.needReadable = true; - state[kBitfield] = state[kBitfield] | - readingMask | syncMask | - (state.length === 0 ? needReadableMask : 0); + bitfield = bitfield | + kReadingMask | kSyncMask | + (state.length === 0 ? kNeedReadableMask : 0); // Call internal read method + state[kBitfield] = bitfield; this._read(state.highWaterMark); + bitfield = state[kBitfield]; // state.sync = false; - state[kBitfield] = state[kBitfield] & ~syncMask; + bitfield = bitfield & ~kSyncMask; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. - if (!(state[kBitfield] & readingMask)) { - n = howMuchToRead(nOrig, state); + if (!(bitfield & kReadingMask)) { + n = howMuchToRead(nOrig, state, bitfield); } } var ret; if (n > 0) - ret = fromList(n, state); + ret = fromList(n, state, bitfield & kObjectModeMask); else ret = null; if (ret === null) { // state.needReadable = state.length <= state.highWaterMark; - state[kBitfield] = (state[kBitfield] & ~needReadableMask) | - (state.length <= state.highWaterMark ? needReadableMask : 0); + bitfield = (bitfield & ~kNeedReadableMask) | + (state.length <= state.highWaterMark ? kNeedReadableMask : 0); n = 0; } else { state.length -= n; // state.awaitDrain = 0; - state[kBitfield] = state[kBitfield] & ~awaitDrainMask; + bitfield = bitfield & ~kAwaitDrainMask; } if (state.length === 0) { // If we have nothing in the buffer, then we want to know // as soon as we *do* get something into the buffer. - if (!(state[kBitfield] & endedMask)) - state[kBitfield] = state[kBitfield] | needReadableMask; + if (!(bitfield & kEndedMask)) + bitfield = bitfield | kNeedReadableMask; // If we tried to read() past the EOF, then emit end on the next tick. - else if (nOrig !== n) { - endReadable(this); - } + else if (nOrig !== n) + bitfield = endReadable(this, bitfield); } + state[kBitfield] = bitfield; + if (ret !== null) this.emit('data', ret); return ret; }; -function onEofChunk(stream, state) { +function onEofChunk(stream, state, bitfield) { debug('onEofChunk'); - if (state[kBitfield] & endedMask) return; + if (bitfield & kEndedMask) return bitfield; if (state.decoder) { var chunk = state.decoder.end(); if (chunk && chunk.length) { state.buffer.push(chunk); - state.length += (state[kBitfield] & objectModeMask) ? 1 : chunk.length; + state.length += (bitfield & kObjectModeMask) ? 1 : chunk.length; } } // state.ended = true; - state[kBitfield] = state[kBitfield] | endedMask; + bitfield = bitfield | kEndedMask; - if (state[kBitfield] & syncMask) { + if (bitfield & kSyncMask) { // If we are sync, wait until next tick to emit the data. // Otherwise we risk emitting data in the flow() // the readable code triggers during a read() call - emitReadable(stream); + bitfield = emitReadable(stream, bitfield); } else { // Emit 'readable' now to make sure it gets picked up. // state.needReadable = false; // state.emittedReadable = true; - state[kBitfield] = (state[kBitfield] & ~needReadableMask) - | emittedReadableMask; + bitfield = (bitfield & ~kNeedReadableMask) | kEmittedReadableMask; // We have to emit readable now that we are EOF. Modules // in the ecosystem (e.g. dicer) rely on this event being sync. + state[kBitfield] = bitfield; emitReadable_(stream); + bitfield = state[kBitfield]; } + + return bitfield; } // Don't emit readable right away in sync mode, because this can trigger // another read() call => stack overflow. This way, it might trigger // a nextTick recursion warning, but that's not so bad. -function emitReadable(stream) { - const state = stream._readableState; - debug('emitReadable', state.needReadable, state.emittedReadable); +function emitReadable(stream, bitfield) { + debug('emitReadable', bitfield & kNeedReadableMask, + bitfield & kEmittedReadableMask); // state.needReadable = false; - state[kBitfield] = state[kBitfield] & ~needReadableMask; - if (!(state[kBitfield] & emittedReadableMask)) { - debug('emitReadable', state.flowing); + bitfield = bitfield & ~kNeedReadableMask; + if (!(bitfield & kEmittedReadableMask)) { + debug('emitReadable', bitfield & kFlowingTrueMask); // state.emittedReadable = true; - state[kBitfield] = state[kBitfield] | emittedReadableMask; + bitfield = bitfield | kEmittedReadableMask; process.nextTick(emitReadable_, stream); } + return bitfield; } function emitReadable_(stream) { const state = stream._readableState; - const destroyed = state[kBitfield] & destroyedMask; - const ended = state[kBitfield] & endedMask; - const flowing = state[kBitfield] & flowingTrueMask; + const destroyed = state[kBitfield] & kDestroyedMask; + const ended = state[kBitfield] & kEndedMask; + const flowing = state[kBitfield] & kFlowingTrueMask; - debug('emitReadable_', state.destroyed, state.length, state.ended); + debug('emitReadable_', destroyed, state.length, ended); if (!destroyed && (ended || state.length)) { stream.emit('readable'); // state.emittedReadable = false; - state[kBitfield] = state[kBitfield] & ~emittedReadableMask; + state[kBitfield] = state[kBitfield] & ~kEmittedReadableMask; } // The stream needs another readable event if @@ -800,7 +810,7 @@ function emitReadable_(stream) { const needReadable = !flowing && !ended && state.length <= state.highWaterMark; // state.emittedReadable = needReadable; - state[kBitfield] = state[kBitfield] | (needReadable ? needReadableMask : 0); + state[kBitfield] = state[kBitfield] | (needReadable ? kNeedReadableMask : 0); flow(stream); } @@ -811,12 +821,13 @@ function emitReadable_(stream) { // it's in progress. // However, if we're not ended, or reading, and the length < hwm, // then go ahead and try to read some more preemptively. -function maybeReadMore(stream, state) { - if (!(state[kBitfield] & readingMoreMask)) { +function maybeReadMore(stream, state, bitfield) { + if (!(bitfield & kReadingMoreMask)) { // state.readingMore = readingMore; - state[kBitfield] = state[kBitfield] | readingMoreMask; + bitfield = bitfield | kReadingMoreMask; process.nextTick(maybeReadMore_, stream, state); } + return bitfield; } function maybeReadMore_(stream, state) { @@ -843,9 +854,9 @@ function maybeReadMore_(stream, state) { // called push() with new data. In this case we skip performing more // read()s. The execution ends in this method again after the _read() ends // up calling push() with more data. - while (!(state[kBitfield] & (readingMask | endedMask)) && + while (!(state[kBitfield] & (kReadingMask | kEndedMask)) && (state.length < state.highWaterMark || - ((state[kBitfield] & flowingTrueMask) && state.length === 0))) { + ((state[kBitfield] & kFlowingTrueMask) && state.length === 0))) { const len = state.length; debug('maybeReadMore read 0'); stream.read(0); @@ -854,7 +865,7 @@ function maybeReadMore_(stream, state) { break; } // state.readingMore = false; - state[kBitfield] = state[kBitfield] & ~readingMoreMask; + state[kBitfield] = state[kBitfield] & ~kReadingMoreMask; } // Abstract method. to be overridden in specific implementation classes. @@ -877,7 +888,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stderr; const endFn = doEnd ? onend : unpipe; - if (state[kBitfield] & endEmittedMask) + if (state[kBitfield] & kEndEmittedMask) process.nextTick(endFn); else src.once('end', endFn); @@ -925,7 +936,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if ((state[kBitfield] & awaitDrainMask) && + if ((state[kBitfield] & kAwaitDrainMask) && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } @@ -941,7 +952,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // also returned false. // => Check whether `dest` is still a piping destination. if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { - debug('false write response, pause', state.awaitDrain); + debug('false write response, pause', + state[kBitfield] & kAwaitDrainMask); // state.awaitDrain++; state[kBitfield]++; @@ -985,7 +997,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.emit('pipe', src); // Start the flow if it hasn't been started already. - if (!(state[kBitfield] & flowingTrueMask)) { + if (!(state[kBitfield] & kFlowingTrueMask)) { debug('pipe resume'); src.resume(); } @@ -997,15 +1009,15 @@ function pipeOnDrain(src) { return function pipeOnDrainFunctionResult() { const state = src._readableState; - debug('pipeOnDrain', state.awaitDrain); - if (state[kBitfield] & awaitDrainMask) { + debug('pipeOnDrain', state[kBitfield] & kAwaitDrainMask); + if (state[kBitfield] & kAwaitDrainMask) { // state.awaitDrain--; state[kBitfield]--; } - if (!(state[kBitfield] & awaitDrainMask) && EE.listenerCount(src, 'data')) { + if (!(state[kBitfield] & kAwaitDrainMask) && EE.listenerCount(src, 'data')) { // state.flowing = true; - state[kBitfield] = state[kBitfield] | flowingTrueMask; + state[kBitfield] = state[kBitfield] | kFlowingTrueMask; flow(src); } }; @@ -1024,7 +1036,8 @@ Readable.prototype.unpipe = function(dest) { // remove all. var dests = state.pipes; state.pipes = []; - state[kBitfield] = state[kBitfield] & ~flowingMask; + // state.flowing = false; + state[kBitfield] = state[kBitfield] & ~kFlowingMask; for (var i = 0; i < dests.length; i++) dests[i].emit('unpipe', this, { hasUnpiped: false }); @@ -1037,8 +1050,10 @@ Readable.prototype.unpipe = function(dest) { return this; state.pipes.splice(index, 1); - if (state.pipes.length === 0) - state[kBitfield] = state[kBitfield] & ~flowingMask; + if (state.pipes.length === 0) { + // state.flowing = false; + state[kBitfield] = state[kBitfield] & ~kFlowingMask; + } dest.emit('unpipe', this, unpipeInfo); @@ -1055,24 +1070,24 @@ Readable.prototype.on = function(ev, fn) { // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). // state.readableListening = this.listenerCount('readable') > 0; - state[kBitfield] = (state[kBitfield] & ~readableListeningMask) | - (this.listenerCount('readable') > 0 ? readableListeningMask : 0); + state[kBitfield] = (state[kBitfield] & ~kReadableListeningMask) | + (this.listenerCount('readable') > 0 ? kReadableListeningMask : 0); // Try start flowing on next tick if stream isn't explicitly paused - if (state[kBitfield] & flowingMask) + if (state[kBitfield] & kFlowingMask) this.resume(); } else if (ev === 'readable') { - if (!(state[kBitfield] & (endEmittedMask | readableListeningMask))) { + if (!(state[kBitfield] & (kEndEmittedMask | kReadableListeningMask))) { // state.readableListening = state.needReadable = true; // state.flowing = false; // state.emittedReadable = false; state[kBitfield] = (state[kBitfield] & - ~(emittedReadableMask | flowingMask)) | - readableListeningMask | needReadableMask; - debug('on readable', state.length, state.reading); + ~(kEmittedReadableMask | kFlowingMask)) | + kReadableListeningMask | kNeedReadableMask; + debug('on readable', state.length, state[kBitfield] & kReadingMask); if (state.length) { - emitReadable(this); - } else if (!(state[kBitfield] & readingMask)) { + state[kBitfield] = emitReadable(this, state[kBitfield]); + } else if (!(state[kBitfield] & kReadingMask)) { process.nextTick(nReadingNextTick, this); } } @@ -1116,20 +1131,21 @@ Readable.prototype.removeAllListeners = function(ev) { function updateReadableListening(self) { const state = self._readableState; + let bitfield = state[kBitfield]; // state.readableListening = self.listenerCount('readable') > 0; - state[kBitfield] = (state[kBitfield] & ~readableListeningMask) | - (self.listenerCount('readable') > 0 ? readableListeningMask : 0); + bitfield = (bitfield & ~kReadableListeningMask) | + (self.listenerCount('readable') > 0 ? kReadableListeningMask : 0); - if ((state[kBitfield] & resumeScheduledMask) && - !(state[kBitfield] & pausedMask)) { + if ((bitfield & kResumeScheduledMask) && + !(bitfield & kPausedMask)) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. // state.flowing = true; - state[kBitfield] = state[kBitfield] | flowingTrueMask; - + state[kBitfield] = bitfield | kFlowingTrueMask; // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { + state[kBitfield] = bitfield; self.resume(); } } @@ -1143,64 +1159,62 @@ function nReadingNextTick(self) { // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { const state = this._readableState; + let bitfield = state[kBitfield]; - if (!(state[kBitfield] & flowingTrueMask)) { + if (!(bitfield & kFlowingTrueMask)) { debug('resume'); // We flow only if there is no one listening // for readable, but we still have to call // resume() - const readableListening = state[kBitfield] & readableListeningMask; + const readableListening = bitfield & kReadableListeningMask; // state.flowing = !readableListening; - state[kBitfield] = (state[kBitfield] & ~flowingMask) | - (!readableListening ? flowingTrueMask : 0); - resume(this, state); + bitfield = (bitfield & ~kFlowingMask) | + (!readableListening ? kFlowingTrueMask : 0); + + if (!(bitfield & kResumeScheduledMask)) { + // state.resumeScheduled = true; + bitfield = bitfield | kResumeScheduledMask; + process.nextTick(resume_, this, state); + } } // state.paused = false; - state[kBitfield] = state[kBitfield] & ~pausedMask; + state[kBitfield] = bitfield & ~kPausedMask; return this; }; -function resume(stream, state) { - if (!(state[kBitfield] & resumeScheduledMask)) { - // state.resumeScheduled = true; - state[kBitfield] = state[kBitfield] | resumeScheduledMask; - process.nextTick(resume_, stream, state); - } -} - function resume_(stream, state) { - debug('resume', state.reading); - if (!(state[kBitfield] & readingMask)) { + debug('resume', state[kBitfield] & kReadingMask); + if (!(state[kBitfield] & kReadingMask)) { stream.read(0); } // state.resumeScheduled = false; - state[kBitfield] = state[kBitfield] & ~resumeScheduledMask; + state[kBitfield] = state[kBitfield] & ~kResumeScheduledMask; stream.emit('resume'); flow(stream); - if ((state[kBitfield] & flowingTrueMask) && - !(state[kBitfield] & readingMask)) + if ((state[kBitfield] & kFlowingTrueMask) && + !(state[kBitfield] & kReadingMask)) stream.read(0); } Readable.prototype.pause = function() { const state = this._readableState; - debug('call pause flowing=%j', state.flowing); - if (state[kBitfield] & flowingMask) { + debug('call pause flowing=%j', state[kBitfield] & kFlowingMask); + if (state[kBitfield] & kFlowingMask) { debug('pause'); // state.flowing = false; - state[kBitfield] = state[kBitfield] & ~flowingMask; + state[kBitfield] = state[kBitfield] & ~kFlowingMask; this.emit('pause'); } // state.paused = true; - state[kBitfield] = state[kBitfield] | pausedMask; + state[kBitfield] = state[kBitfield] | kPausedMask; return this; }; function flow(stream) { const state = stream._readableState; - debug('flow', state.flowing); - while ((state[kBitfield] & flowingTrueMask) && stream.read() !== null); + debug('flow', state[kBitfield] & kFlowingTrueMask); + while ((state[kBitfield] & kFlowingTrueMask) && stream.read() !== null); } // Wrap an old-style stream as the async data source. @@ -1212,7 +1226,7 @@ Readable.prototype.wrap = function(stream) { stream.on('end', () => { debug('wrapped end'); - if (state.decoder && !(state[kBitfield] & endedMask)) { + if (state.decoder && !(state[kBitfield] & kEndedMask)) { var chunk = state.decoder.end(); if (chunk && chunk.length) this.push(chunk); @@ -1226,7 +1240,7 @@ Readable.prototype.wrap = function(stream) { if (state.decoder) chunk = state.decoder.write(chunk); - const objectMode = state[kBitfield] & objectModeMask; + const objectMode = state[kBitfield] & kObjectModeMask; // Don't skip over falsy values in objectMode if (objectMode && (chunk === null || chunk === undefined)) return; @@ -1343,13 +1357,13 @@ Object.defineProperty(Readable.prototype, 'readableEncoding', { // Length is the combined lengths of all the buffers in the list. // This function is designed to be inlinable, so please take care when making // changes to the function body. -function fromList(n, state) { +function fromList(n, state, objectMode = state[kBitfield] & kObjectModeMask) { // nothing buffered if (state.length === 0) return null; var ret; - if (state[kBitfield] & objectModeMask) + if (objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Read it all, truncate the list @@ -1368,28 +1382,29 @@ function fromList(n, state) { return ret; } -function endReadable(stream) { +function endReadable(stream, bitfield) { const state = stream._readableState; - debug('endReadable', state.endEmitted); - if (!(state[kBitfield] & endEmittedMask)) { + debug('endReadable', bitfield & kEndEmittedMask); + if (!(bitfield & kEndEmittedMask)) { // state.ended = true; - state[kBitfield] = state[kBitfield] | endedMask; + bitfield = bitfield | kEndedMask; process.nextTick(endReadableNT, state, stream); } + return bitfield; } function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); + debug('endReadableNT', state[kBitfield] & kEndEmittedMask, state.length); // Check that we didn't get one last unshift. - if (!(state[kBitfield] & endEmittedMask) && state.length === 0) { + if (!(state[kBitfield] & kEndEmittedMask) && state.length === 0) { // state.endEmitted = true; - state[kBitfield] = state[kBitfield] | endEmittedMask; + state[kBitfield] = state[kBitfield] | kEndEmittedMask; stream.readable = false; stream.emit('end'); - if (state[kBitfield] & autoDestroyMask) { + if (state[kBitfield] & kAutoDestroyMask) { // In case of duplex streams we need a way to detect // if the writable side is ready for autoDestroy as well const wState = stream._writableState;