From 28b85ad559cbb82cc7e60961186eaf21abcbe010 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 29 Sep 2023 19:42:07 +0200 Subject: [PATCH] stream: readable bimap PR-URL: https://github.com/nodejs/node/pull/49963 --- lib/internal/streams/readable.js | 289 ++++++++++++++++++++----------- 1 file changed, 191 insertions(+), 98 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7ceb83d3f20523..a129b1b6f4b75d 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -72,7 +72,6 @@ const { } = require('internal/errors'); const { validateObject } = require('internal/validators'); -const kPaused = Symbol('kPaused'); const kState = Symbol('kState'); const { StringDecoder } = require('string_decoder'); @@ -84,6 +83,11 @@ const nop = () => {}; const { errorOrDestroy } = destroyImpl; +const kErroredValue = Symbol('kErroredValue'); +const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); +const kDecoderValue = Symbol('kDecoderValue'); +const kEncodingValue = Symbol('kEncodingValue'); + const kObjectMode = 1 << 0; const kEnded = 1 << 1; const kEndEmitted = 1 << 2; @@ -103,6 +107,14 @@ const kCloseEmitted = 1 << 15; const kMultiAwaitDrain = 1 << 16; const kReadingMore = 1 << 17; const kDataEmitted = 1 << 18; +const kErrored = 1 << 19; +const kDefaultUTF8Encoding = 1 << 20; +const kDecoder = 1 << 21; +const kEncoding = 1 << 22; +const kHasFlowing = 1 << 23; +const kFlowing = 1 << 24; +const kHasPaused = 1 << 25; +const kPaused = 1 << 26; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -151,8 +163,93 @@ ObjectDefineProperties(ReadableState.prototype, { // If true, a maybeReadMore has been scheduled. readingMore: makeBitMapDescriptor(kReadingMore), dataEmitted: makeBitMapDescriptor(kDataEmitted), + + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. + errored: { + __proto__: null, + enumerable: false, + get() { + return (this[kState] & kErrored) !== 0 ? this[kErroredValue] : null; + }, + set(value) { + if (value) { + this[kErroredValue] = value; + this[kState] |= kErrored; + } else { + this[kState] &= ~kErrored; + } + }, + }, + + defaultEncoding: { + __proto__: null, + enumerable: false, + get() { return (this[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, + set(value) { + if (value === 'utf8' || value === 'utf-8') { + this[kState] |= kDefaultUTF8Encoding; + } else { + this[kState] &= ~kDefaultUTF8Encoding; + this[kDefaultEncodingValue] = value; + } + }, + }, + + decoder: { + __proto__: null, + enumerable: false, + get() { + return (this[kState] & kDecoder) !== 0 ? this[kDecoderValue] : null; + }, + set(value) { + if (value) { + this[kDecoderValue] = value; + this[kState] |= kDecoder; + } else { + this[kState] &= ~kDecoder; + } + }, + }, + + encoding: { + __proto__: null, + enumerable: false, + get() { + return (this[kState] & kEncoding) !== 0 ? this[kEncodingValue] : null; + }, + set(value) { + if (value) { + this[kEncodingValue] = value; + this[kState] |= kEncoding; + } else { + this[kState] &= ~kEncoding; + } + }, + }, + + flowing: { + __proto__: null, + enumerable: false, + get() { + return (this[kState] & kHasFlowing) !== 0 ? (this[kState] & kFlowing) !== 0 : null; + }, + set(value) { + if (value == null) { + this[kState] &= ~(kHasFlowing | kFlowing); + } else if (value) { + this[kState] |= (kHasFlowing | kFlowing); + } else { + this[kState] |= kHasFlowing; + this[kState] &= ~kFlowing; + } + }, + }, }); + function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -184,9 +281,6 @@ function ReadableState(options, stream, isDuplex) { this.buffer = new BufferList(); this.length = 0; this.pipes = []; - this.flowing = null; - - this[kPaused] = null; // Should close be emitted on destroy. Defaults to true. if (options && options.emitClose === false) this[kState] &= ~kEmitClose; @@ -194,20 +288,12 @@ function ReadableState(options, stream, isDuplex) { // Should .destroy() be called after 'end' (and potentially 'finish'). if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy; - - // Indicates whether the stream has errored. When true no further - // _read calls, 'data' or 'readable' events should occur. This is needed - // since when autoDestroy is disabled we need a way to tell whether the - // stream has failed. - this.errored = null; - - // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. const defaultEncoding = options?.defaultEncoding; - if (defaultEncoding == null) { - this.defaultEncoding = 'utf8'; + if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') { + this[kState] |= kDefaultUTF8Encoding; } else if (Buffer.isEncoding(defaultEncoding)) { this.defaultEncoding = defaultEncoding; } else { @@ -218,8 +304,6 @@ function ReadableState(options, stream, isDuplex) { // type: null | Writable | Set. this.awaitDrainWriters = null; - this.decoder = null; - this.encoding = null; if (options && options.encoding) { this.decoder = new StringDecoder(options.encoding); this.encoding = options.encoding; @@ -363,7 +447,6 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) { if (chunk === null) { state[kState] &= ~kReading; onEofChunk(stream, state); - return false; } @@ -391,22 +474,20 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) { return canPushMore(state); } - if (state.ended) { + if ((state[kState] & kEnded) !== 0) { errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - return false; } - if (state.destroyed || state.errored) { + if ((state[kState] & (kDestroyed | kErrored)) !== 0) { return false; } state[kState] &= ~kReading; - if (state.decoder && !encoding) { - chunk = state.decoder.write(chunk); + if ((state[kState] & kDecoder) !== 0 && !encoding) { + chunk = state[kDecoderValue].write(chunk); if (chunk.length === 0) { maybeReadMore(stream, state); - return canPushMore(state); } } @@ -419,22 +500,22 @@ function readableAddChunkPushObjectMode(stream, state, chunk, encoding) { if (chunk === null) { state[kState] &= ~kReading; onEofChunk(stream, state); - return false; } - if (state.ended) { + if ((state[kState] & kEnded) !== 0) { errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); return false; } - if (state.destroyed || state.errored) { + if ((state[kState] & (kDestroyed | kErrored)) !== 0) { return false; } state[kState] &= ~kReading; - if (state.decoder && !encoding) { - chunk = state.decoder.write(chunk); + + if ((state[kState] & kDecoder) !== 0 && !encoding) { + chunk = state[kDecoderValue].write(chunk); } addChunk(stream, state, chunk, false); @@ -445,12 +526,12 @@ function canPushMore(state) { // We can push more data if we are below the highWaterMark. // Also, if we have no data yet, we can stand some more bytes. // This is to work around cases where hwm=0, such as the repl. - return !state.ended && + return (state[kState] & kEnded) === 0 && (state.length < state.highWaterMark || state.length === 0); } function addChunk(stream, state, chunk, addToFront) { - if (state.flowing && state.length === 0 && !state.sync && + if ((state[kState] & (kFlowing | kSync)) === kFlowing && state.length === 0 && stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. @@ -460,11 +541,11 @@ function addChunk(stream, state, chunk, addToFront) { state.awaitDrainWriters = null; } - state.dataEmitted = true; + state[kState] |= kDataEmitted; stream.emit('data', chunk); } else { // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; + state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else @@ -478,7 +559,7 @@ function addChunk(stream, state, chunk, addToFront) { Readable.prototype.isPaused = function() { const state = this._readableState; - return state[kPaused] === true || state.flowing === false; + return (state[kState] & kPaused) !== 0 || (state[kState] & (kHasFlowing | kFlowing)) === kHasFlowing; }; // Backwards compatibility. @@ -529,13 +610,13 @@ function howMuchToRead(n, state) { return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time. - if (state.flowing && state.length) + if ((state[kState] & kFlowing) !== 0 && state.length) return state.buffer.first().length; return state.length; } if (n <= state.length) return n; - return state.ended ? state.length : 0; + return (state[kState] & kEnded) !== 0 ? state.length : 0; } // You can override either this method, or the async _read(n) below. @@ -562,13 +643,13 @@ Readable.prototype.read = function(n) { // already have a bunch of data in the buffer, then just trigger // the 'readable' event and move on. if (n === 0 && - state.needReadable && + (state[kState] & kNeedReadable) !== 0 && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || - state.ended)) { - debug('read: emitReadable', state.length, state.ended); - if (state.length === 0 && state.ended) + (state[kState] & kEnded) !== 0)) { + debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0); + if (state.length === 0 && (state[kState] & kEnded) !== 0) endReadable(this); else emitReadable(this); @@ -578,7 +659,7 @@ Readable.prototype.read = function(n) { n = howMuchToRead(n, state); // If we've ended, and we're now clear, then finish it up. - if (n === 0 && state.ended) { + if (n === 0 && (state[kState] & kEnded) !== 0) { if (state.length === 0) endReadable(this); return null; @@ -619,8 +700,7 @@ Readable.prototype.read = function(n) { // However, if we've ended, then there's no point, if we're already // reading, then it's unnecessary, if we're constructing we have to wait, // and if we're destroyed or errored, then it's not allowed, - if (state.ended || state.reading || state.destroyed || state.errored || - !state.constructed) { + if ((state[kState] & (kReading | kEnded | kDestroyed | kErrored | kConstructed)) !== kConstructed) { doRead = false; debug('reading, ended or constructing', doRead); } else if (doRead) { @@ -640,7 +720,7 @@ Readable.prototype.read = function(n) { // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. - if (!state.reading) + if ((state[kState] & kReading) === 0) n = howMuchToRead(nOrig, state); } @@ -651,11 +731,11 @@ Readable.prototype.read = function(n) { ret = null; if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; + state[kState] |= state.length <= state.highWaterMark ? kNeedReadable : 0; n = 0; } else { state.length -= n; - if (state.multiAwaitDrain) { + if ((state[kState] & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear(); } else { state.awaitDrainWriters = null; @@ -665,16 +745,16 @@ Readable.prototype.read = function(n) { if (state.length === 0) { // If we have nothing in the buffer, then we want to know // as soon as we *do* get something into the buffer. - if (!state.ended) - state.needReadable = true; + if ((state[kState] & kEnded) === 0) + state[kState] |= kNeedReadable; // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended) + if (nOrig !== n && (state[kState] & kEnded) !== 0) endReadable(this); } - if (ret !== null && !state.errorEmitted && !state.closeEmitted) { - state.dataEmitted = true; + if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) { + state[kState] |= kDataEmitted; this.emit('data', ret); } @@ -683,25 +763,26 @@ Readable.prototype.read = function(n) { function onEofChunk(stream, state) { debug('onEofChunk'); - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); + if ((state[kState] & kEnded) !== 0) return; + const decoder = (state[kState] & kDecoder) !== 0 ? state[kDecoderValue] : null; + if (decoder) { + const chunk = decoder.end(); if (chunk && chunk.length) { state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; + state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; } } - state.ended = true; + state[kState] |= kEnded; - if (state.sync) { + if ((state[kState] & kSync) !== 0) { // If we are sync, wait until next tick to emit the data. // Otherwise we risk emitting data in the flow() // the readable code triggers during a read() call. emitReadable(stream); } else { // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - state.emittedReadable = true; + state[kState] &= ~kNeedReadable; + state[kState] |= kEmittedReadable; // We have to emit readable now that we are EOF. Modules // in the ecosystem (e.g. dicer) rely on this event being sync. emitReadable_(stream); @@ -713,21 +794,21 @@ function onEofChunk(stream, state) { // a nextTick recursion warning, but that's not so bad. function emitReadable(stream) { const state = stream._readableState; - debug('emitReadable', state.needReadable, state.emittedReadable); - state.needReadable = false; - if (!state.emittedReadable) { - debug('emitReadable', state.flowing); - state.emittedReadable = true; + debug('emitReadable'); + state[kState] &= ~kNeedReadable; + if ((state[kState] & kEmittedReadable) === 0) { + debug('emitReadable', (state[kState] & kFlowing) !== 0); + state[kState] |= kEmittedReadable; process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { const state = stream._readableState; - debug('emitReadable_', state.destroyed, state.length, state.ended); - if (!state.destroyed && !state.errored && (state.length || state.ended)) { + debug('emitReadable_'); + if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) { stream.emit('readable'); - state.emittedReadable = false; + state[kState] &= ~kEmittedReadable; } // The stream needs another readable event if: @@ -736,10 +817,9 @@ function emitReadable_(stream) { // 2. It is not ended. // 3. It is below the highWaterMark, so we can schedule // another readable later. - state.needReadable = - !state.flowing && - !state.ended && - state.length <= state.highWaterMark; + state[kState] |= + (state[kState] & (kFlowing | kEnded)) === 0 && + state.length <= state.highWaterMark ? kNeedReadable : 0; flow(stream); } @@ -751,8 +831,8 @@ function emitReadable_(stream) { // However, if we're not ended, or reading, and the length < hwm, // then go ahead and try to read some more preemptively. function maybeReadMore(stream, state) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; + if ((state[kState] & (kReadingMore | kConstructed)) === kConstructed) { + state[kState] |= kReadingMore; process.nextTick(maybeReadMore_, stream, state); } } @@ -781,9 +861,9 @@ function maybeReadMore_(stream, state) { // called push() with new data. In this case we skip performing more // read()s. The execution ends in this method again after the _read() ends // up calling push() with more data. - while (!state.reading && !state.ended && + while ((state[kState] & (kReading | kEnded)) === 0 && (state.length < state.highWaterMark || - (state.flowing && state.length === 0))) { + ((state[kState] & kFlowing) !== 0 && state.length === 0))) { const len = state.length; debug('maybeReadMore read 0'); stream.read(0); @@ -791,7 +871,7 @@ function maybeReadMore_(stream, state) { // Didn't get any data, stop spinning. break; } - state.readingMore = false; + state[kState] &= ~kReadingMore; } // Abstract method. to be overridden in specific implementation classes. @@ -808,7 +888,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (state.pipes.length === 1) { if (!state.multiAwaitDrain) { - state.multiAwaitDrain = true; + state[kState] |= kMultiAwaitDrain; state.awaitDrainWriters = new SafeSet( state.awaitDrainWriters ? [state.awaitDrainWriters] : [], ); @@ -1089,16 +1169,16 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && state[kPaused] === false) { + if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. - state.flowing = true; + state[kState] |= kHasFlowing | kFlowing; // Crude way to check if we should resume. } else if (self.listenerCount('data') > 0) { self.resume(); - } else if (!state.readableListening) { - state.flowing = null; + } else if ((state[kState] & kReadableListening) === 0) { + state[kState] &= ~(kHasFlowing | kFlowing); } } @@ -1111,15 +1191,21 @@ function nReadingNextTick(self) { // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { const state = this._readableState; - if (!state.flowing) { + if ((state[kState] & kFlowing) === 0) { debug('resume'); // We flow only if there is no one listening // for readable, but we still have to call // resume(). - state.flowing = !state.readableListening; + state[kState] |= kHasFlowing; + if (!state.readableListening) { + state[kState] |= kFlowing; + } else { + state[kState] &= ~kFlowing; + } resume(this, state); } - state[kPaused] = false; + state[kState] |= kHasPaused; + state[kState] &= ~kPaused; return this; }; @@ -1131,33 +1217,35 @@ function resume(stream, state) { } function resume_(stream, state) { - debug('resume', state.reading); - if (!state.reading) { + debug('resume', (state[kState] & kReading) !== 0); + if ((state[kState] & kReading) === 0) { stream.read(0); } - state.resumeScheduled = false; + state[kState] &= ~kResumeScheduled; stream.emit('resume'); flow(stream); - if (state.flowing && !state.reading) + if ((state[kState] & (kFlowing | kReading)) === kFlowing) stream.read(0); } Readable.prototype.pause = function() { - debug('call pause flowing=%j', this._readableState.flowing); - if (this._readableState.flowing !== false) { + const state = this._readableState; + debug('call pause'); + if (state.flowing !== false) { debug('pause'); - this._readableState.flowing = false; + state[kState] |= kHasFlowing; + state[kState] &= ~kFlowing; this.emit('pause'); } - this._readableState[kPaused] = true; + state[kState] |= kHasPaused | kPaused; return this; }; function flow(stream) { const state = stream._readableState; - debug('flow', state.flowing); - while (state.flowing && stream.read() !== null); + debug('flow'); + while ((state[kState] & kFlowing) !== 0 && stream.read() !== null); } // Wrap an old-style stream as the async data source. @@ -1436,10 +1524,15 @@ ObjectDefineProperties(ReadableState.prototype, { paused: { __proto__: null, get() { - return this[kPaused] !== false; + return (this[kState] & kPaused) !== 0; }, set(value) { - this[kPaused] = !!value; + this[kState] |= kHasPaused; + if (value) { + this[kState] |= kPaused; + } else { + this[kState] &= ~kPaused; + } }, }, }); @@ -1479,9 +1572,9 @@ function fromList(n, state) { function endReadable(stream) { const state = stream._readableState; - debug('endReadable', state.endEmitted); - if (!state.endEmitted) { - state.ended = true; + debug('endReadable', (state[kState] & kEndEmitted) !== 0); + if ((state[kState] & kEndEmitted) === 0) { + state[kState] |= kEnded; process.nextTick(endReadableNT, state, stream); } }