diff --git a/benchmark/streams/readable-boundaryread.js b/benchmark/streams/readable-boundaryread.js index 1a0b7eb7ac9ddc..4834da0a2c5bf8 100644 --- a/benchmark/streams/readable-boundaryread.js +++ b/benchmark/streams/readable-boundaryread.js @@ -4,20 +4,22 @@ const common = require('../common'); const Readable = require('stream').Readable; const bench = common.createBenchmark(main, { - n: [200e1] + n: [200e1], + type: ['string', 'buffer'] }); function main(conf) { const n = +conf.n; - const b = new Buffer(32); const s = new Readable(); - function noop() {} - s._read = noop; + var data = 'a'.repeat(32); + if (conf.type === 'buffer') + data = Buffer.from(data); + s._read = function() {}; bench.start(); for (var k = 0; k < n; ++k) { for (var i = 0; i < 1e4; ++i) - s.push(b); + s.push(data); while (s.read(32)); } bench.end(n); diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8b0d45cc86ccf5..9666c7d6fb2a18 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -177,81 +177,97 @@ Readable.prototype._destroy = function(err, cb) { // write() some more. Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; - - if (!state.objectMode && typeof chunk === 'string') { - encoding = encoding || state.defaultEncoding; - if (encoding !== state.encoding) { - chunk = Buffer.from(chunk, encoding); - encoding = ''; + var skipChunkCheck; + + if (!state.objectMode) { + if (typeof chunk === 'string') { + encoding = encoding || state.defaultEncoding; + if (encoding !== state.encoding) { + chunk = Buffer.from(chunk, encoding); + encoding = ''; + } + skipChunkCheck = true; } + } else { + skipChunkCheck = true; } - return readableAddChunk(this, state, chunk, encoding, false); + return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); }; // Unshift should *always* be something directly out of read() Readable.prototype.unshift = function(chunk) { - var state = this._readableState; - return readableAddChunk(this, state, chunk, '', true); -}; - -Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + return readableAddChunk(this, chunk, null, true, false); }; -function readableAddChunk(stream, state, chunk, encoding, addToFront) { - var er = chunkInvalid(state, chunk); - if (er) { - stream.emit('error', er); - } else if (chunk === null) { +function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { + var state = stream._readableState; + if (chunk === null) { state.reading = false; onEofChunk(stream, state); - } else if (state.objectMode || chunk && chunk.length > 0) { - if (state.ended && !addToFront) { - const e = new Error('stream.push() after EOF'); - stream.emit('error', e); - } else if (state.endEmitted && addToFront) { - const e = new Error('stream.unshift() after end event'); - stream.emit('error', e); - } else { - var skipAdd; - if (state.decoder && !addToFront && !encoding) { - chunk = state.decoder.write(chunk); - skipAdd = (!state.objectMode && chunk.length === 0); - } - - if (!addToFront) + } else { + var er; + if (!skipChunkCheck) + er = chunkInvalid(state, chunk); + if (er) { + stream.emit('error', er); + } else if (state.objectMode || chunk && chunk.length > 0) { + if (addToFront) { + if (state.endEmitted) + stream.emit('error', new Error('stream.unshift() after end event')); + else + addChunk(stream, state, chunk, true); + } else if (state.ended) { + stream.emit('error', new Error('stream.push() after EOF')); + } else { state.reading = false; - - // Don't add to the buffer if we've decoded to an empty string chunk and - // we're not in object mode - if (!skipAdd) { - // if we want the data now, just emit it. - if (state.flowing && state.length === 0 && !state.sync) { - stream.emit('data', chunk); - stream.read(0); - } else { - // update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); + if (state.decoder && !encoding) { + chunk = state.decoder.write(chunk); + if (state.objectMode || chunk.length !== 0) + addChunk(stream, state, chunk, false); else - state.buffer.push(chunk); - - if (state.needReadable) - emitReadable(stream); + maybeReadMore(stream, state); + } else { + addChunk(stream, state, chunk, false); } } - - maybeReadMore(stream, state); + } else if (!addToFront) { + state.reading = false; } - } else if (!addToFront) { - state.reading = false; } return needMoreData(state); } +function addChunk(stream, state, chunk, addToFront) { + if (state.flowing && state.length === 0 && !state.sync) { + stream.emit('data', chunk); + stream.read(0); + } else { + // update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + } + maybeReadMore(stream, state); +} + +function chunkInvalid(state, chunk) { + var er; + if (!(chunk instanceof Buffer) && + typeof chunk !== 'string' && + chunk !== undefined && + !state.objectMode) { + er = new TypeError('Invalid non-string/buffer chunk'); + } + return er; +} + // if it's past the high water mark, we can push in some more. // Also, if we have no data yet, we can stand some @@ -267,6 +283,10 @@ function needMoreData(state) { state.length === 0); } +Readable.prototype.isPaused = function() { + return this._readableState.flowing === false; +}; + // backwards compatibility. Readable.prototype.setEncoding = function(enc) { if (!StringDecoder) @@ -438,19 +458,6 @@ Readable.prototype.read = function(n) { return ret; }; -function chunkInvalid(state, chunk) { - var er = null; - if (!(chunk instanceof Buffer) && - typeof chunk !== 'string' && - chunk !== null && - chunk !== undefined && - !state.objectMode) { - er = new TypeError('Invalid non-string/buffer chunk'); - } - return er; -} - - function onEofChunk(stream, state) { if (state.ended) return; if (state.decoder) {