Skip to content

Commit

Permalink
stream: simplify push
Browse files Browse the repository at this point in the history
PR-URL: #31150
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag authored and codebytere committed Mar 17, 2020
1 parent 7a8963b commit 6e76752
Showing 1 changed file with 33 additions and 52 deletions.
85 changes: 33 additions & 52 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;

let skipChunkCheck;

let err;
if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
Expand All @@ -231,54 +230,47 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
skipChunkCheck = true;
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
} else {
skipChunkCheck = true;
}

if (chunk === null) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else {
var er;
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
errorOrDestroy(stream, er);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
if (typeof chunk !== 'string' &&
!state.objectMode &&
// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
!(chunk instanceof Buffer)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}

if (addToFront) {
if (state.endEmitted)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
} else if (state.objectMode || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
maybeReadMore(stream, state);
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
addChunk(stream, state, chunk, false);
}
} else if (!addToFront) {
state.reading = false;
maybeReadMore(stream, state);
}
} else if (!addToFront) {
state.reading = false;
maybeReadMore(stream, state);
}

// We can push more data if we are below the highWaterMark.
Expand Down Expand Up @@ -306,17 +298,6 @@ function addChunk(stream, state, chunk, addToFront) {
maybeReadMore(stream, state);
}

function chunkInvalid(state, chunk) {
if (!Stream._isUint8Array(chunk) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
return new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
}


Readable.prototype.isPaused = function() {
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
Expand Down

0 comments on commit 6e76752

Please sign in to comment.