diff --git a/lib/internal/fixed_queue.js b/lib/internal/fixed_queue.js index 4019d6e6dc46fa..43679589e094f3 100644 --- a/lib/internal/fixed_queue.js +++ b/lib/internal/fixed_queue.js @@ -2,6 +2,8 @@ const { Array, + SymbolIterator, + TypedArrayPrototypeSet, } = primordials; // Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. @@ -76,7 +78,13 @@ class FixedCircularBuffer { this.list[this.top] = data; this.top = (this.top + 1) & kMask; } - + first() { + return this.list[this.bottom] ?? null; + } + unshift(data) { + this.bottom = (this.bottom - 1) & kMask; + this.list[this.bottom] = data; + } shift() { const nextItem = this.list[this.bottom]; if (nextItem === undefined) @@ -90,12 +98,12 @@ class FixedCircularBuffer { module.exports = class FixedQueue { constructor() { this.head = this.tail = new FixedCircularBuffer(); + this.length = 0; } isEmpty() { return this.head.isEmpty(); } - push(data) { if (this.head.isFull()) { // Head is full: Creates a new queue, sets the old queue's `.next` to it, @@ -103,8 +111,115 @@ module.exports = class FixedQueue { this.head = this.head.next = new FixedCircularBuffer(); } this.head.push(data); + this.length += data.length; + } + clear() { + this.head = this.tail = new FixedCircularBuffer(); + this.length = 0; + } + unshift(data) { + if (this.tail.isFull()) { + // Tail is full: Creates a new queue, and put it at the start + // of the list. + const newTail = new FixedCircularBuffer(); + newTail.next = this.tail; + this.tail = newTail; + } + this.tail.unshift(data); + this.length += data.length; + } + join() { + let ret = ''; + for (let p = this.tail; p !== null; p = p.next) { + for (let i = p.bottom; i !== p.top; i = (i + 1) & kMask) { + ret += p.list[i]; + } + } + return ret; + } + concat(n) { + const ret = Buffer.allocUnsafe(n >>> 0); + let p = this.tail; + let i = 0; + while (p) { + let slice; + if (p.bottom < p.top) { + slice = p.list.slice(p.bottom, p.top); + } else { + slice = p.list.slice(p.bottom).concat(p.list.slice(0, p.top)); + } + TypedArrayPrototypeSet(ret, slice, i); + i += slice.length; + p = p.next; + } + return ret; + } + consume(n, hasStrings) { + let ret; + // n < this.tail.top - this.tail.bottom but accountng to othe fact + // bottom might be more than top with kMask + if (n < ((this.tail.top - this.tail.bottom) & kMask)) { + // This slice is in the first buffer. Read it accounting for the fact + // that bottom might be more than top with kMask. + if (this.tail.bottom < this.tail.top) { + ret = this.tail.list.slice(this.tail.bottom, this.tail.bottom + n); + } else { + ret = this.tail.list.slice(this.tail.bottom).concat(this.tail.list.slice(0, n)); + } + this.tail.bottom = (this.tail.bottom + n) & kMask; + } else if (n === ((this.tail.top - this.tail.bottom) & kMask)) { + // This slice spans the first buffer exactly. + if (this.tail.bottom < this.tail.top) { + ret = this.tail.list.slice(this.tail.bottom, this.tail.top); + } else { + ret = this.tail.list.slice(this.tail.bottom) + .concat(this.tail.list.slice(0, n)); + } + this.tail = this.tail.next; + } else { + // This slice spans multiple buffers. + ret = Buffer.allocUnsafe(n); + let i = 0; + let p = this.tail; + while (n > 0) { + let slice; + // account for the fact n might be more than top - bottom with kMask + if (n < ((p.top - p.bottom) & kMask)) { + slice = p.list.slice(p.bottom, p.bottom + n); + p.bottom = (p.bottom + n) & kMask; + n = 0; + } + if (p.bottom < p.top) { + slice = p.list.slice(p.bottom, p.top); + } else { + slice = p.list.slice(p.bottom).concat(p.list.slice(0, p.top)); + } + const length = Math.min(n, slice.length); + TypedArrayPrototypeSet(ret, slice.slice(0, length), i); + i += length; + n -= length; + if (!p.next) { + break; + } + p = p.next; + } + } + this.length -= ret.length; + if (hasStrings) { + ret = ret.join(''); + } + return ret; + } + *[SymbolIterator]() { + for (let p = this.tail; p !== null; p = p.next) { + for (let i = p.bottom; i !== p.top; i = (i + 1) & kMask) { + yield p.list[i]; + } + } + } + first() { + return this.tail.first(); } - shift() { const tail = this.tail; const next = tail.shift(); @@ -113,6 +228,7 @@ module.exports = class FixedQueue { this.tail = tail.next; tail.next = null; } + this.length -= next?.length; return next; } }; diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 1b40192d9458ba..5eabc80f6639a7 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -51,7 +51,7 @@ const eos = require('internal/streams/end-of-stream'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); -const BufferList = require('internal/streams/buffer_list'); +const FixedQueue = require('internal/fixed_queue'); const destroyImpl = require('internal/streams/destroy'); const { getHighWaterMark, @@ -106,10 +106,9 @@ function ReadableState(options, stream, isDuplex) { getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : getDefaultHighWaterMark(false); - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift(). - this.buffer = new BufferList(); + // Use a chain of deques so we can remove elements quickly but still + // have good ram cache locality. + this.buffer = new FixedQueue(); this.length = 0; this.pipes = []; this.flowing = null; @@ -269,7 +268,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (state.encoding !== encoding) { if (addToFront && state.encoding) { // When unshifting, if state.encoding is set, we have to save - // the string in the BufferList with the state encoding. + // the string in the FixedQueue with the state encoding. chunk = Buffer.from(chunk, encoding).toString(state.encoding); } else { chunk = Buffer.from(chunk, encoding); @@ -1342,7 +1341,7 @@ function fromList(n, state) { else if (!n || n >= state.length) { // Read it all, truncate the list. if (state.decoder) - ret = state.buffer.join(''); + ret = state.buffer.join(); else if (state.buffer.length === 1) ret = state.buffer.first(); else