Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: change bufferlist to fixedqueue #49754

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions lib/internal/fixed_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const {
Array,
SymbolIterator,
TypedArrayPrototypeSet,
} = primordials;

// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two.
Expand Down Expand Up @@ -76,7 +78,13 @@ class FixedCircularBuffer {
this.list[this.top] = data;
this.top = (this.top + 1) & kMask;
}

first() {
return this.list[this.bottom] ?? null;
}
unshift(data) {
this.bottom = (this.bottom - 1) & kMask;
this.list[this.bottom] = data;
}
shift() {
const nextItem = this.list[this.bottom];
if (nextItem === undefined)
Expand All @@ -90,21 +98,128 @@ class FixedCircularBuffer {
module.exports = class FixedQueue {
constructor() {
this.head = this.tail = new FixedCircularBuffer();
this.length = 0;
}

isEmpty() {
return this.head.isEmpty();
}

push(data) {
if (this.head.isFull()) {
// Head is full: Creates a new queue, sets the old queue's `.next` to it,
// and sets it as the new main queue.
this.head = this.head.next = new FixedCircularBuffer();
}
this.head.push(data);
this.length += data.length;
}
clear() {
this.head = this.tail = new FixedCircularBuffer();
this.length = 0;
}
unshift(data) {
if (this.tail.isFull()) {
// Tail is full: Creates a new queue, and put it at the start
// of the list.
const newTail = new FixedCircularBuffer();
newTail.next = this.tail;
this.tail = newTail;
}
this.tail.unshift(data);
this.length += data.length;
}
join() {
let ret = '';
for (let p = this.tail; p !== null; p = p.next) {
for (let i = p.bottom; i !== p.top; i = (i + 1) & kMask) {
ret += p.list[i];
}
}
return ret;
}
concat(n) {
const ret = Buffer.allocUnsafe(n >>> 0);
let p = this.tail;
let i = 0;
while (p) {
let slice;
if (p.bottom < p.top) {
slice = p.list.slice(p.bottom, p.top);
} else {
slice = p.list.slice(p.bottom).concat(p.list.slice(0, p.top));
}
TypedArrayPrototypeSet(ret, slice, i);
i += slice.length;
p = p.next;
}
return ret;
}
consume(n, hasStrings) {
let ret;
// n < this.tail.top - this.tail.bottom but accountng to othe fact
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// n < this.tail.top - this.tail.bottom but accountng to othe fact
// n < this.tail.top - this.tail.bottom but accounting for other fact

// bottom might be more than top with kMask
if (n < ((this.tail.top - this.tail.bottom) & kMask)) {
// This slice is in the first buffer. Read it accounting for the fact
// that bottom might be more than top with kMask.
if (this.tail.bottom < this.tail.top) {
ret = this.tail.list.slice(this.tail.bottom, this.tail.bottom + n);
} else {
ret = this.tail.list.slice(this.tail.bottom).concat(this.tail.list.slice(0, n));
}
this.tail.bottom = (this.tail.bottom + n) & kMask;
} else if (n === ((this.tail.top - this.tail.bottom) & kMask)) {
// This slice spans the first buffer exactly.
if (this.tail.bottom < this.tail.top) {
ret = this.tail.list.slice(this.tail.bottom, this.tail.top);
} else {
ret = this.tail.list.slice(this.tail.bottom)
.concat(this.tail.list.slice(0, n));
}
this.tail = this.tail.next;
} else {
// This slice spans multiple buffers.
ret = Buffer.allocUnsafe(n);
let i = 0;
let p = this.tail;
while (n > 0) {
let slice;
// account for the fact n might be more than top - bottom with kMask
if (n < ((p.top - p.bottom) & kMask)) {
slice = p.list.slice(p.bottom, p.bottom + n);
p.bottom = (p.bottom + n) & kMask;
n = 0;
}
if (p.bottom < p.top) {
slice = p.list.slice(p.bottom, p.top);
} else {
slice = p.list.slice(p.bottom).concat(p.list.slice(0, p.top));
}
const length = Math.min(n, slice.length);
TypedArrayPrototypeSet(ret, slice.slice(0, length), i);
i += length;
n -= length;
if (!p.next) {
break;
}
p = p.next;
}
}
this.length -= ret.length;
if (hasStrings) {
ret = ret.join('');
}
return ret;
}
*[SymbolIterator]() {
for (let p = this.tail; p !== null; p = p.next) {
for (let i = p.bottom; i !== p.top; i = (i + 1) & kMask) {
yield p.list[i];
}
}
}
first() {
return this.tail.first();
}

shift() {
const tail = this.tail;
const next = tail.shift();
Expand All @@ -113,6 +228,7 @@ module.exports = class FixedQueue {
this.tail = tail.next;
tail.next = null;
}
this.length -= next?.length;
return next;
}
};
13 changes: 6 additions & 7 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const eos = require('internal/streams/end-of-stream');
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
const BufferList = require('internal/streams/buffer_list');
const FixedQueue = require('internal/fixed_queue');
const destroyImpl = require('internal/streams/destroy');
const {
getHighWaterMark,
Expand Down Expand Up @@ -106,10 +106,9 @@ function ReadableState(options, stream, isDuplex) {
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);

// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList();
// Use a chain of deques so we can remove elements quickly but still
// have good ram cache locality.
this.buffer = new FixedQueue();
this.length = 0;
this.pipes = [];
this.flowing = null;
Expand Down Expand Up @@ -269,7 +268,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (state.encoding !== encoding) {
if (addToFront && state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
// the string in the FixedQueue with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
Expand Down Expand Up @@ -1342,7 +1341,7 @@ function fromList(n, state) {
else if (!n || n >= state.length) {
// Read it all, truncate the list.
if (state.decoder)
ret = state.buffer.join('');
ret = state.buffer.join();
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
Expand Down
Loading