Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: improve readable push performance #13113

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions benchmark/streams/readable-boundaryread.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ const common = require('../common');
const Readable = require('stream').Readable;

const bench = common.createBenchmark(main, {
n: [200e1]
n: [200e1],
type: ['string', 'buffer']
});

function main(conf) {
const n = +conf.n;
const b = new Buffer(32);
const s = new Readable();
function noop() {}
s._read = noop;
var data = 'a'.repeat(32);
if (conf.type === 'buffer')
data = Buffer.from(data);
s._read = function() {};

bench.start();
for (var k = 0; k < n; ++k) {
for (var i = 0; i < 1e4; ++i)
s.push(b);
s.push(data);
while (s.read(32));
}
bench.end(n);
Expand Down
141 changes: 74 additions & 67 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,81 +177,97 @@ Readable.prototype._destroy = function(err, cb) {
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;

if (!state.objectMode && typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
var skipChunkCheck;

if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
skipChunkCheck = true;
}
} else {
skipChunkCheck = true;
}

return readableAddChunk(this, state, chunk, encoding, false);
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
var state = this._readableState;
return readableAddChunk(this, state, chunk, '', true);
};

Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
return readableAddChunk(this, chunk, null, true, false);
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (chunk === null) {
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
var state = stream._readableState;
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (state.ended && !addToFront) {
const e = new Error('stream.push() after EOF');
stream.emit('error', e);
} else if (state.endEmitted && addToFront) {
const e = new Error('stream.unshift() after end event');
stream.emit('error', e);
} else {
var skipAdd;
if (state.decoder && !addToFront && !encoding) {
chunk = state.decoder.write(chunk);
skipAdd = (!state.objectMode && chunk.length === 0);
}

if (!addToFront)
} else {
var er;
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (addToFront) {
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new Error('stream.push() after EOF'));
} else {
state.reading = false;

// Don't add to the buffer if we've decoded to an empty string chunk and
// we're not in object mode
if (!skipAdd) {
// if we want the data now, just emit it.
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
state.buffer.push(chunk);

if (state.needReadable)
emitReadable(stream);
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
}

maybeReadMore(stream, state);
} else if (!addToFront) {
state.reading = false;
}
} else if (!addToFront) {
state.reading = false;
}

return needMoreData(state);
}

function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);

if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}

function chunkInvalid(state, chunk) {
var er;
if (!(chunk instanceof Buffer) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}


// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
Expand All @@ -267,6 +283,10 @@ function needMoreData(state) {
state.length === 0);
}

Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
};

// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
Expand Down Expand Up @@ -438,19 +458,6 @@ Readable.prototype.read = function(n) {
return ret;
};

function chunkInvalid(state, chunk) {
var er = null;
if (!(chunk instanceof Buffer) &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}


function onEofChunk(stream, state) {
if (state.ended) return;
if (state.decoder) {
Expand Down