Skip to content

Commit

Permalink
stream: use Array for Readable buffer
Browse files Browse the repository at this point in the history
PR-URL: #50341
  • Loading branch information
ronag committed Oct 23, 2023
1 parent 25576b5 commit 59f941e
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 132 deletions.
2 changes: 1 addition & 1 deletion benchmark/streams/readable-bigread.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function main({ n }) {

bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read(128));
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/streams/readable-readall.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function main({ n }) {

bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read());
}
Expand Down
141 changes: 112 additions & 29 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ const {
ObjectSetPrototypeOf,
Promise,
SafeSet,
Symbol,
SymbolAsyncDispose,
SymbolAsyncIterator,
Symbol,
SymbolSpecies,
TypedArrayPrototypeSet,
} = primordials;

module.exports = Readable;
Expand All @@ -51,7 +53,6 @@ const eos = require('internal/streams/end-of-stream');
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
const BufferList = require('internal/streams/buffer_list');
const destroyImpl = require('internal/streams/destroy');
const {
getHighWaterMark,
Expand All @@ -73,6 +74,7 @@ const {
const { validateObject } = require('internal/validators');

const kState = Symbol('kState');
const FastBuffer = Buffer[SymbolSpecies];

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -275,10 +277,8 @@ function ReadableState(options, stream, isDuplex) {
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);

// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList();
this.buffer = [];
this.bufferIndex = 0;
this.length = 0;
this.pipes = [];

Expand Down Expand Up @@ -546,10 +546,15 @@ function addChunk(stream, state, chunk, addToFront) {
} else {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
}

if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
Expand All @@ -564,21 +569,24 @@ Readable.prototype.isPaused = function() {

// Backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
const state = this._readableState;

const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
state.decoder = decoder;
// If setEncoding(null), decoder.encoding equals utf8.
this._readableState.encoding = this._readableState.decoder.encoding;
state.encoding = state.decoder.encoding;

const buffer = this._readableState.buffer;
// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of buffer) {
for (const data of state.buffer.slice(state.bufferIndex)) {
content += decoder.write(data);
}
buffer.clear();
state.buffer.length = 0;
state.bufferIndex = 0;

if (content !== '')
buffer.push(content);
this._readableState.length = content.length;
state.buffer.push(content);
state.length = content.length;
return this;
};

Expand Down Expand Up @@ -611,7 +619,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer.first().length;
return state.buffer[state.bufferIndex].length;
return state.length;
}
if (n <= state.length)
Expand Down Expand Up @@ -1549,21 +1557,96 @@ function fromList(n, state) {
if (state.length === 0)
return null;

let idx = state.bufferIndex;
let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if (!n || n >= state.length) {

const buf = state.buffer;
const len = buf.length;

if ((state[kState] & kObjectMode) !== 0) {
ret = buf[idx];
buf[idx++] = null;
} else if (!n || n >= state.length) {
// Read it all, truncate the list.
if (state.decoder)
ret = state.buffer.join('');
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
ret = state.buffer.concat(state.length);
state.buffer.clear();
if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (idx < len) {
ret += buf[idx];
buf[idx++] = null;
}
} else if (len - idx === 0) {
ret = Buffer.alloc(0);
} else if (len - idx === 1) {
ret = buf[idx];
buf[idx++] = null;
} else {
ret = Buffer.allocUnsafe(state.length);

let i = 0;
while (idx < len) {
TypedArrayPrototypeSet(ret, buf[idx], i);
i += buf[idx].length;
buf[idx++] = null;
}
}
} else if (n < buf[idx].length) {
// `slice` is the same for buffers and strings.
ret = buf[idx].slice(0, n);
buf[idx] = buf[idx].slice(n);
} else if (n === buf[idx].length) {
// First chunk is a perfect match.
ret = buf[idx];
buf[idx++] = null;
} else if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (idx < len) {
const str = buf[idx];
if (n > str.length) {
ret += str;
n -= str.length;
buf[idx++] = null;
} else {
if (n === buf.length) {
ret += str;
buf[idx++] = null;
} else {
ret += str.slice(0, n);
buf[idx] = str.slice(n);
}
break;
}
}
} else {
ret = Buffer.allocUnsafe(n);

const retLen = n;
while (idx < len) {
const data = buf[idx];
if (n > data.length) {
TypedArrayPrototypeSet(ret, data, retLen - n);
n -= data.length;
buf[idx++] = null;
} else {
if (n === data.length) {
TypedArrayPrototypeSet(ret, data, retLen - n);
buf[idx++] = null;
} else {
TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n);
}
break;
}
}
}

if (idx === buf.length) {
state.buffer.length = 0;
state.bufferIndex = 0;
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
} else {
// read part of list.
ret = state.buffer.consume(n, state.decoder);
state.bufferIndex = idx;
}

return ret;
Expand Down
101 changes: 0 additions & 101 deletions test/parallel/test-stream2-readable-from-list.js

This file was deleted.

0 comments on commit 59f941e

Please sign in to comment.