Skip to content

Commit

Permalink
streams: use Array for Readable buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Oct 23, 2023
1 parent 25576b5 commit 2944cda
Showing 1 changed file with 108 additions and 22 deletions.
130 changes: 108 additions & 22 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const {
const { validateObject } = require('internal/validators');

const kState = Symbol('kState');
const FastBuffer = Buffer[Symbol.species];

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -278,7 +279,8 @@ function ReadableState(options, stream, isDuplex) {
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList();
this.buffer = [];
this.bufferIndex = 0;
this.length = 0;
this.pipes = [];

Expand Down Expand Up @@ -546,10 +548,15 @@ function addChunk(stream, state, chunk, addToFront) {
} else {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
}

if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
Expand All @@ -564,21 +571,24 @@ Readable.prototype.isPaused = function() {

// Backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
const state = this._readableState;

const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
state.decoder = decoder;
// If setEncoding(null), decoder.encoding equals utf8.
this._readableState.encoding = this._readableState.decoder.encoding;
state.encoding = state.decoder.encoding;

const buffer = this._readableState.buffer;
// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of buffer) {
for (const data of state.buffer.slice(state.bufferIndex)) {
content += decoder.write(data);
}
buffer.clear();
state.buffer.length = 0;
state.bufferIndex = 0;

if (content !== '')
buffer.push(content);
this._readableState.length = content.length;
state.length = content.length;
return this;
};

Expand Down Expand Up @@ -611,7 +621,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer.first().length;
return state.buffer[state.bufferIndex].length;
return state.length;
}
if (n <= state.length)
Expand Down Expand Up @@ -1550,20 +1560,96 @@ function fromList(n, state) {
return null;

let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if (!n || n >= state.length) {
if ((state[kState] & kObjectMode) !== 0) {
ret = state.buffer[state.bufferIndex++];
} else if (!n || n >= state.length) {
// Read it all, truncate the list.
if (state.decoder)
ret = state.buffer.join('');
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
ret = state.buffer.concat(state.length);
state.buffer.clear();
if ((state[kState] & kDecoder) !== 0) {
ret = ''
for (let n = state.bufferIndex; n < state.buffer.length; n++) {
ret += state.buffer[n];
}
} else if (state.buffer.length - state.bufferIndex === 0) {
ret = Buffer.alloc(0)
} else if (state.buffer.length - state.bufferIndex === 1) {
ret = state.buffer[state.bufferIndex];
} else {
ret = Buffer.allocUnsafe(n >>> 0);
let i = 0;
for (let n = state.bufferIndex; n < state.buffer.length; n++) {
const data = state.buffer[n];
ret.set(data, i);
i += data.length;
}
}
state.buffer.length = 0;
state.bufferIndex = 0;
} else {
// read part of list.
ret = state.buffer.consume(n, state.decoder);

const data = state.buffer[state.bufferIndex];

if (n < data.length) {
// `slice` is the same for buffers and strings.
const slice = data.slice(0, n);
state.buffer[state.bufferIndex] = data.slice(n);
return slice;
}

if (n === data.length) {
// First chunk is a perfect match.
return state.buffer[state.bufferIndex++];
}

if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (state.bufferIndex < state.buffer.length) {
const str = state.buffer[state.bufferIndex];
if (n > str.length) {
ret += str;
n -= str.length;
state.bufferIndex++;
} else {
if (n === buf.length) {
ret += str;
state.bufferIndex++;
} else {
ret += str.slice(0, n);
state.buffer[state.bufferIndex] = str.slice(n);
}
break;
}
}
} else {
ret = Buffer.allocUnsafe(n);

const retLen = n;
while (state.bufferIndex < state.buffer.length) {
const buf = state.buffer[state.bufferIndex];
if (n > buf.length) {
ret.set(buf, retLen - n);
n -= buf.length;
state.bufferIndex++;
} else {
if (n === buf.length) {
ret.set(buf, retLen - n);
state.bufferIndex++;
} else {
ret.set(new FastBuffer(buf.buffer, buf.byteOffset, n), retLen - n);
state.buffer[state.bufferIndex] = new FastBuffer(buf.buffer, buf.byteOffset + n);
}
break;
}
}
}

if (state.bufferIndex === state.buffer.length) {
state.buffer.length = 0;
state.bufferIndex = 0
} else if (state.bufferIndex > 256) {
state.buffer = state.buffer.slice(state.bufferIndex);
state.bufferIndex = 0;
}
}

return ret;
Expand Down

0 comments on commit 2944cda

Please sign in to comment.