From b8c34556890e9dd9f9811d0d3fccec702e74b25f Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Wed, 18 Jan 2017 12:25:34 -0500 Subject: [PATCH] Fix queue total size tracking logic Previously, we believed that the spec's inefficient-but-clear method, of adding up all the chunk sizes whenever it wanted to get the total queue size, was equivalent to keeping a running count. This was discussed in #582; note also #491 where we changed the reference implementation to the running-count strategy for speed reasons. As discovered in https://github.com/whatwg/streams/issues/582#issuecomment-273016992, and further elaborated on in #657, these two methods are not in fact equivalent, due to the vagaries of floating-point arithmetic. (That is, they are equivalent if you assume the addition is floating-point addition, but that is less than clear from the spec.) As such, this commit switches the spec to the more efficient running-count method, as that's realistically the only implementable version. It also adds tests to ensure floating point arithmetic is being used, exposing such cases. This commit also includes a few unobservable cleanups: - It introduces the ResetQueue abstract operation, to ensure that the queue total size gets reset to zero when the queue is cleared. This should not matter because no code paths check the queue's total size after it has been cleared, but keeping the two slots in sync seems virtuous. - It updates the internal slot name for ReadableByteStreamController instances from [[totalQueuedBytes]] to [[queueTotalSize]], to be consistent with those for the other queue-containers. We do not yet use the queue-with-sizes abstract operations (except ResetQueue) on ReadableByteStreamController instances as the queue management is significantly more complicated there. But aligning makes the spec easier to read. --- index.bs | 143 ++++++++++-------- .../lib/queue-with-sizes.js | 46 +++--- .../lib/readable-stream.js | 53 ++++--- .../lib/writable-stream.js | 26 ++-- ...floating-point-total-queue-size.https.html | 14 ++ .../floating-point-total-queue-size.js | 86 +++++++++++ ...floating-point-total-queue-size.https.html | 14 ++ .../floating-point-total-queue-size.js | 69 +++++++++ 8 files changed, 331 insertions(+), 120 deletions(-) create mode 100644 reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html create mode 100644 reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js create mode 100644 reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html create mode 100644 reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js diff --git a/index.bs b/index.bs index a75dfbae5..dc1facf2b 100644 --- a/index.bs +++ b/index.bs @@ -30,6 +30,7 @@ urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface text: DataView; url: #sec-dataview-objects; type: interface + text: Number; url: #sec-ecmascript-language-types-number-type; type: interface text: Uint8Array; url: #sec-typedarray-objects; type: interface text: typed array; url: #sec-typedarray-objects; type: dfn text: the typed array constructors table; url: #table-49; type: dfn @@ -1467,6 +1468,10 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size of all the chunks stored in \[[queue]] (see [[#queue-with-sizes]]) + \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -1503,7 +1508,7 @@ ReadableStreamDefaultController(stream, underlyingSource, 1. If _stream_.[[readableStreamController]] is not *undefined*, throw a *TypeError* exception. 1. Set *this*.[[controlledReadableStream]] to _stream_. 1. Set *this*.[[underlyingSource]] to _underlyingSource_. - 1. Set *this*.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(*this*). 1. Set *this*.[[started]], *this*.[[closeRequested]], *this*.[[pullAgain]], and *this*.[[pulling]] to *false*. 1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_). 1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to @@ -1586,7 +1591,7 @@ polymorphic dispatch from the readable stream implementation to either these or
\[[Cancel]](reason)
- 1. Set *this*.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(*this*). 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingSource]], `"cancel"`, « _reason_ ») @@ -1595,7 +1600,7 @@ polymorphic dispatch from the readable stream implementation to either these or 1. Let _stream_ be *this*.[[controlledReadableStream]]. 1. If *this*.[[queue]] is not empty, - 1. Let _chunk_ be ! DequeueValue(*this*.[[queue]]). + 1. Let _chunk_ be ! DequeueValue(*this*). 1. If *this*.[[closeRequested]] is *true* and *this*.[[queue]] is empty, perform ! ReadableStreamClose(_stream_). 1. Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(*this*). 1. Return a promise resolved with ! CreateIterResultObject(_chunk_, *false*). @@ -1688,7 +1693,7 @@ asserts). 1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _chunkSize_.[[Value]]). 1. Return _chunkSize_. 1. Let _chunkSize_ be _chunkSize_.[[Value]]. - 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_.[[queue]], _chunk_, _chunkSize_). + 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_, _chunk_, _chunkSize_). 1. If _enqueueResult_ is an abrupt completion, 1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _enqueueResult_.[[Value]]). 1. Return _enqueueResult_. @@ -1714,7 +1719,7 @@ an assert). 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _stream_.[[state]] is `"readable"`. - 1. Set _controller_.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Perform ! ReadableStreamError(_stream_, _e_). @@ -1735,8 +1740,7 @@ the {{ReadableStreamDefaultController/desiredSize}} property of the stream's ass Specifications should not use this on streams they did not create. - 1. Let _queueSize_ be ! GetTotalQueueSize(_controller_.[[queue]]). - 1. Return _controller_.[[strategyHWM]] − _queueSize_. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

Class @@ -1814,6 +1818,10 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size (in bytes) of all the chunks stored in \[[queue]] + \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -1823,10 +1831,6 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot A number supplied to the constructor as part of the stream's queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source - - \[[totalQueuedBytes]] - The number of bytes stored in \[[queue]] - \[[underlyingByteSource]] An object representation of the stream's underlying byte source; @@ -1834,6 +1838,14 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot +
+

Although {{ReadableByteStreamController}} instances have \[[queue]] and \[[queueTotalSize]] slots, we do not use + most of the abstract operations in [[#queue-with-sizes]] on them, as the way in which we manipulate this queue is + rather different than the others in the spec. Instead, we update the two slots together manually.

+ +

This might be cleaned up in a future spec refactoring.

+
+

new ReadableByteStreamController(stream, underlyingByteSource, highWaterMark)

@@ -1850,9 +1862,8 @@ ReadableByteStreamController(stream, underlyingByteSource, 1. Set *this*.[[underlyingByteSource]] to _underlyingByteSource_. 1. Set *this*.[[pullAgain]], and *this*.[[pulling]] to *false*. 1. Perform ! ReadableByteStreamControllerClearPendingPullIntos(*this*). - 1. Set *this*.[[queue]] to a new empty List. - 1. Set *this*.[[totalQueuedBytes]] to *0*. - 1. Set *this*.[[started]], and *this*.[[closeRequested]] to *false*. + 1. Perform ! ResetQueue(*this*). + 1. Set *this*.[[started]] and *this*.[[closeRequested]] to *false*. 1. Set *this*.[[strategyHWM]] to ? ValidateAndNormalizeHighWaterMark(_highWaterMark_). 1. Let _autoAllocateChunkSize_ be ? GetV(_underlyingByteSource_, `"autoAllocateChunkSize"`). 1. If _autoAllocateChunkSize_ is not *undefined*, @@ -1958,8 +1969,7 @@ dispatch from the readable stream implementation to either these or their counte 1. If *this*.[[pendingPullIntos]] is not empty, 1. Let _firstDescriptor_ be the first element of *this*.[[pendingPullIntos]]. 1. Set _firstDescriptor_.[[bytesFilled]] to *0*. - 1. Set *this*.[[queue]] to a new empty List. - 1. Set *this*.[[totalQueuedBytes]] to *0*. + 1. Perform ! ResetQueue(*this*). 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingByteSource]], `"cancel"`, « _reason_ »)
@@ -1969,11 +1979,11 @@ dispatch from the readable stream implementation to either these or their counte 1. Let _stream_ be *this*.[[controlledReadableStream]]. 1. Assert: ! ReadableStreamHasDefaultReader(_stream_) is *true*. 1. If ! ReadableStreamGetNumReadRequests(_stream_) is *0*, - 1. If *this*.[[totalQueuedBytes]] > *0*, + 1. If *this*.[[queueTotalSize]] > *0*, 1. Let _entry_ be the first element of *this*.[[queue]]. 1. Remove _entry_ from *this*.[[queue]], shifting all other elements downward (so that the second becomes the first, and so on). - 1. Set *this*.[[totalQueuedBytes]] to *this*.[[totalQueuedBytes]] − _entry_.[[byteLength]]. + 1. Set *this*.[[queueTotalSize]] to *this*.[[queueTotalSize]] − _entry_.[[byteLength]]. 1. Perform ! ReadableByteStreamControllerHandleQueueDrain(*this*). 1. Let _view_ be ! Construct(%Uint8Array%, « _entry_.[[buffer]], _entry_.[[byteOffset]], _entry_.[[byteLength]] »). @@ -2133,7 +2143,7 @@ throws>ReadableByteStreamControllerClose ( controller ) 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _controller_.[[closeRequested]] is *false*. 1. Assert: _stream_.[[state]] is `"readable"`. - 1. If _controller_.[[totalQueuedBytes]] > *0*, + 1. If _controller_.[[queueTotalSize]] > *0*, 1. Set _controller_.[[closeRequested]] to *true*. 1. Return. 1. If _controller_.[[pendingPullIntos]] is not empty, @@ -2213,7 +2223,7 @@ nothrow>ReadableByteStreamControllerEnqueueChunkToQueue ( controller, 1. Append Record {[[buffer]]: _buffer_, [[byteOffset]]: _byteOffset_, [[byteLength]]: _byteLength_} as the last element of _controller_.[[queue]]. - 1. Add _byteLength_ to _controller_.[[totalQueuedBytes]]. + 1. Add _byteLength_ to _controller_.[[queueTotalSize]].

ReadableByteStreamControllerError ( controller, e 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _stream_.[[state]] is `"readable"`. 1. Perform ! ReadableByteStreamControllerClearPendingPullIntos(_controller_). - 1. Let _controller_.[[queue]] be a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Perform ! ReadableStreamError(_stream_, _e_). @@ -2248,7 +2258,7 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr 1. Let _elementSize_ be _pullIntoDescriptor_.[[elementSize]]. 1. Let _currentAlignedBytes_ be _pullIntoDescriptor_.[[bytesFilled]] − (_pullIntoDescriptor_.[[bytesFilled]] mod _elementSize_). - 1. Let _maxBytesToCopy_ be min(_controller_.[[totalQueuedBytes]], _pullIntoDescriptor_.[[byteLength]] − + 1. Let _maxBytesToCopy_ be min(_controller_.[[queueTotalSize]], _pullIntoDescriptor_.[[byteLength]] − _pullIntoDescriptor_.[[bytesFilled]]). 1. Let _maxBytesFilled_ be _pullIntoDescriptor_.[[bytesFilled]] + _maxBytesToCopy_. 1. Let _maxAlignedBytes_ be _maxBytesFilled_ − (_maxBytesFilled_ mod _elementSize_). @@ -2270,12 +2280,12 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr 1. Otherwise, 1. Set _headOfQueue_.[[byteOffset]] to _headOfQueue_.[[byteOffset]] + _bytesToCopy_. 1. Set _headOfQueue_.[[byteLength]] to _headOfQueue_.[[byteLength]] − _bytesToCopy_. - 1. Set _controller_.[[totalQueuedBytes]] to _controller_.[[totalQueuedBytes]] − _bytesToCopy_. + 1. Set _controller_.[[queueTotalSize]] to _controller_.[[queueTotalSize]] − _bytesToCopy_. 1. Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(_controller_, _bytesToCopy_, _pullIntoDescriptor_). 1. Set _totalBytesToCopyRemaining_ to _totalBytesToCopyRemaining_ − _bytesToCopy_. 1. If _ready_ is *false*, - 1. Assert: _controller_.[[totalQueuedBytes]] is *0*. + 1. Assert: _controller_.[[queueTotalSize]] is *0*. 1. Assert: _pullIntoDescriptor_.[[bytesFilled]] > *0*. 1. Assert: _pullIntoDescriptor_.[[bytesFilled]] < _pullIntoDescriptor_.[[elementSize]]. 1. Return _ready_. @@ -2285,7 +2295,7 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr nothrow>ReadableByteStreamControllerGetDesiredSize ( controller )

- 1. Return _controller_.[[strategyHWM]] − _controller_.[[totalQueuedBytes]]. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

ReadableByteStreamControllerHandleQueueDrain ( controller ) 1. Assert: _controller_.[[controlledReadableStream]].[[state]] is `"readable"`. - 1. If _controller_.[[totalQueuedBytes]] is *0* and _controller_.[[closeRequested]] is *true*, + 1. If _controller_.[[queueTotalSize]] is *0* and _controller_.[[closeRequested]] is *true*, 1. Perform ! ReadableStreamClose(_controller_.[[controlledReadableStream]]). 1. Otherwise, 1. Perform ! ReadableByteStreamControllerCallPullIfNeeded(_controller_). @@ -2317,7 +2327,7 @@ nothrow>ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( 1. Assert: _controller_.[[closeRequested]] is *false*. 1. Repeat the following steps while _controller_.[[pendingPullIntos]] is not empty, - 1. If _controller_.[[totalQueuedBytes]] is *0*, return. + 1. If _controller_.[[queueTotalSize]] is *0*, return. 1. Let _pullIntoDescriptor_ be the first element of _controller_.[[pendingPullIntos]]. 1. If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(_controller_, _pullIntoDescriptor_) is *true*, 1. Perform ! ReadableByteStreamControllerShiftPendingPullInto(_controller_). @@ -2348,7 +2358,7 @@ nothrow>ReadableByteStreamControllerPullInto ( controller, view< 1. If _stream_.[[state]] is `"closed"`, 1. Let _emptyView_ be ! Construct(_ctor_, « _pullIntoDescriptor_.[[buffer]], _pullIntoDescriptor_.[[byteOffset]], *0* »). 1. Return a promise resolved with ! CreateIterResultObject(_emptyView_, *true*). - 1. If _controller_.[[totalQueuedBytes]] > *0*, + 1. If _controller_.[[queueTotalSize]] > *0*, 1. If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(_controller_, _pullIntoDescriptor_) is *true*, 1. Let _filledView_ be ! ReadableByteStreamControllerConvertPullIntoDescriptor(_pullIntoDescriptor_). 1. Perform ! ReadableByteStreamControllerHandleQueueDrain(_controller_). @@ -3228,6 +3238,10 @@ Instances of {{WritableStreamDefaultController}} are created with the internal s \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size of all the chunks stored in \[[queue]] (see [[#queue-with-sizes]]) + \[[started]] A boolean flag indicating whether the underlying sink has finished starting @@ -3269,8 +3283,8 @@ WritableStreamDefaultController(stream, underlyingSink, IsWritableStreamDefaultController ( x )

nothrow>WritableStreamDefaultControllerAbort ( controller, reason ) - 1. Set _controller_.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Let _sinkAbortPromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"abort"`, « _reason_ »). 1. Return the result of transforming _sinkAbortPromise_ by a fulfillment handler that returns @@ -3332,7 +3346,7 @@ nothrow>WritableStreamDefaultControllerAbort ( controller, reaso nothrow>WritableStreamDefaultControllerClose ( controller ) - 1. Perform ! EnqueueValueWithSize(_controller_.[[queue]], `"close"`, *0*). + 1. Perform ! EnqueueValueWithSize(_controller_, `"close"`, *0*). 1. Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(_controller_). @@ -3340,8 +3354,7 @@ nothrow>WritableStreamDefaultControllerClose ( controller ) nothrow>WritableStreamDefaultControllerGetDesiredSize ( controller ) - 1. Let _queueSize_ be ! GetTotalQueueSize(_controller_.[[queue]]). - 1. Return _controller_.[[strategyHWM]] − _queueSize_. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

WritableStreamDefaultControllerWrite ( controller, chunk 1. Return. 1. Let _writeRecord_ be Record {[[chunk]]: _chunk_}. 1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_). - 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_.[[queue]], _writeRecord_, _chunkSize_). + 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_, _writeRecord_, _chunkSize_). 1. If _enqueueResult_ is an abrupt completion, 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _enqueueResult_.[[Value]]). 1. Return. @@ -3378,7 +3391,7 @@ aoid="WritableStreamDefaultControllerAdvanceQueueIfNeeded" nothrow>WritableStrea 1. If _controller_.[[started]] is *false*, return. 1. If _controller_.[[writing]] is *true*, return. 1. If _controller_.[[queue]] is empty, return. - 1. Let _writeRecord_ be ! PeekQueueValue(_controller_.[[queue]]). + 1. Let _writeRecord_ be ! PeekQueueValue(_controller_). 1. If _writeRecord_ is `"close"`, perform WritableStreamDefaultControllerProcessClose(_controller_). 1. Otherwise, perform WritableStreamDefaultControllerProcessWrite(_controller_, _writeRecord_.[[chunk]]). @@ -3397,7 +3410,7 @@ nothrow>WritableStreamDefaultControllerProcessClose ( controller ) 1. Let _stream_ be _controller_.[[controlledWritableStream]]. 1. Assert: _stream_.[[state]] is `"closing"`. - 1. Perform ! DequeueValue(_controller_.[[queue]]). + 1. Perform ! DequeueValue(_controller_). 1. Assert: _controller_.[[queue]] is empty. 1. Set _controller_.[[inClose]] to *true*. 1. Let _sinkClosePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"close"`, « _controller_ »). @@ -3449,7 +3462,7 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, WritableStreamDefaultControllerError ( controller, e

Transform Streams

@@ -3651,46 +3664,55 @@ CountQueuingStrategy({ highWaterMark })

Queue-with-Sizes Operations

The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their -determined sizes. A queue-with-sizes is a List of Records with \[[value]] and \[[size]] fields (although in -implementations it would of course be backed by a more efficient data structure). +determined sizes. Various specification objects contain a queue-with-sizes, represented by the object having two paired +internal slots, always named \[[queue]] and \[[queueTotalSize]]. \[[queue]] is a List of Records with \[[value]] and +\[[size]] fields, and \[[queueTotalSize]] is a JavaScript {{Number}}, i.e. a double-precision floating point number. -A number of abstract operations are specified here to make working with queues-with-sizes more pleasant, and used -throughout the rest of this standard. +The following abstract operations are used when operating on objects that contain queues-with-sizes, in order to ensure +that the two internal slots stay synchronized. -

DequeueValue ( queue )

+

Due to the vagaries of floating point arithmetic, the framework specified here, of keeping a running +total in the \[[queueTotalSize]] slot, is not equivalent to adding up the size of all chunks in +\[[queue]]. This is most apparent when the total size is very small or very large.

+ +

DequeueValue ( container )

- 1. Assert: _queue_ is not empty. - 1. Let _pair_ be the first element of _queue_. - 1. Remove _pair_ from _queue_, shifting all other elements downward (so that the second becomes the first, and so on). + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Assert: _container_.[[queue]] is not empty. + 1. Let _pair_ be the first element of _container_.[[queue]]. + 1. Remove _pair_ from _container_.[[queue]], shifting all other elements downward (so that the second becomes the + first, and so on). + 1. Set _container_.[[queueTotalSize]] to _container_.[[queueTotalSize]] − _pair_.[[size]]. 1. Return _pair_.[[value]]. -

EnqueueValueWithSize ( queue, +

EnqueueValueWithSize ( container, value, size )

+ 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. 1. Let _size_ be ? ToNumber(_size_). 1. If ! IsFiniteNonNegativeNumber(_size_) is *false*, throw a *RangeError* exception. - 1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _queue_. + 1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _container_.[[queue]]. + 1. Set _container_.[[queueTotalSize]] to _container_.[[queueTotalSize]] + _size_. -

GetTotalQueueSize ( queue )

+

PeekQueueValue ( container )

- 1. Let _totalSize_ be *0*. - 1. Repeat for each Record {[[value]], [[size]]} _pair_ that is an element of _queue_, - 1. Assert: _pair_.[[size]] is a finite, non-*NaN* number. - 1. Set _totalSize_ to _totalSize_ + _pair_.[[size]]. - 1. Return _totalSize_. + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Assert: _container_.[[queue]] is not empty. + 1. Let _pair_ be the first element of _container_.[[queue]]. + 1. Return _pair_.[[value]]. -

PeekQueueValue ( queue )

+

ResetQueue ( container )

- 1. Assert: _queue_ is not empty. - 1. Let _pair_ be the first element of _queue_. - 1. Return _pair_.[[value]]. + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Set _container_.[[queue]] to a new empty List. + 1. Set _container_.[[queueTotalSize]] to *0*.

Miscellaneous Operations

@@ -4255,6 +4277,9 @@ itself will evolve in these ways. ECMAScript spec does. +It's also worth noting that, as in [[!ECMASCRIPT]], all numbers are represented as double-precision floating point +values, and all arithmetic operations performed on them must be done in the usual way for such values. +

Acknowledgments

The editor would like to thank diff --git a/reference-implementation/lib/queue-with-sizes.js b/reference-implementation/lib/queue-with-sizes.js index 9ded23094..56ba4d3f5 100644 --- a/reference-implementation/lib/queue-with-sizes.js +++ b/reference-implementation/lib/queue-with-sizes.js @@ -2,39 +2,43 @@ const assert = require('assert'); const { IsFiniteNonNegativeNumber } = require('./helpers.js'); -exports.DequeueValue = queue => { - assert(queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.'); - const pair = queue.shift(); +exports.DequeueValue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + assert(container._queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.'); - queue._totalSize -= pair.size; + const pair = container._queue.shift(); + container._queueTotalSize -= pair.size; return pair.value; }; -exports.EnqueueValueWithSize = (queue, value, size) => { +exports.EnqueueValueWithSize = (container, value, size) => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + size = Number(size); if (!IsFiniteNonNegativeNumber(size)) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } - queue.push({ value, size }); - - if (queue._totalSize === undefined) { - queue._totalSize = 0; - } - queue._totalSize += size; + container._queue.push({ value, size }); + container._queueTotalSize += size; }; -// This implementation is not per-spec. Total size is cached for speed. -exports.GetTotalQueueSize = queue => { - if (queue._totalSize === undefined) { - queue._totalSize = 0; - } - return queue._totalSize; -}; +exports.PeekQueueValue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + assert(container._queue.length > 0, 'Spec-level failure: should never peek at an empty queue.'); -exports.PeekQueueValue = queue => { - assert(queue.length > 0, 'Spec-level failure: should never peek at an empty queue.'); - const pair = queue[0]; + const pair = container._queue[0]; return pair.value; }; + +exports.ResetQueue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + + container._queue = []; + container._queueTotalSize = 0; +}; diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 5b47502b1..5368c5e00 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -5,7 +5,7 @@ const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, Invo require('./helpers.js'); const { createArrayFromList, createDataProperty, typeIsObject } = require('./helpers.js'); const { rethrowAssertionErrorRejection } = require('./utils.js'); -const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } = require('./queue-with-sizes.js'); +const { DequeueValue, EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite } = require('./writable-stream.js'); @@ -853,7 +853,10 @@ class ReadableStreamDefaultController { this._underlyingSource = underlyingSource; - this._queue = []; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); + this._started = false; this._closeRequested = false; this._pullAgain = false; @@ -938,8 +941,7 @@ class ReadableStreamDefaultController { } [InternalCancel](reason) { - this._queue = []; - + ResetQueue(this); return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]); } @@ -947,7 +949,7 @@ class ReadableStreamDefaultController { const stream = this._controlledReadableStream; if (this._queue.length > 0) { - const chunk = DequeueValue(this._queue); + const chunk = DequeueValue(this); if (this._closeRequested === true && this._queue.length === 0) { ReadableStreamClose(stream); @@ -1077,7 +1079,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { } try { - EnqueueValueWithSize(controller._queue, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize); } catch (enqueueE) { ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); throw enqueueE; @@ -1094,7 +1096,7 @@ function ReadableStreamDefaultControllerError(controller, e) { assert(stream._state === 'readable'); - controller._queue = []; + ResetQueue(controller); ReadableStreamError(stream, e); } @@ -1106,8 +1108,7 @@ function ReadableStreamDefaultControllerErrorIfNeeded(controller, e) { } function ReadableStreamDefaultControllerGetDesiredSize(controller) { - const queueSize = GetTotalQueueSize(controller._queue); - return controller._strategyHWM - queueSize; + return controller._strategyHWM - controller._queueTotalSize; } class ReadableStreamBYOBRequest { @@ -1171,11 +1172,11 @@ class ReadableByteStreamController { ReadableByteStreamControllerClearPendingPullIntos(this); - this._queue = []; - this._totalQueuedBytes = 0; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); this._closeRequested = false; - this._started = false; this._strategyHWM = ValidateAndNormalizeHighWaterMark(highWaterMark); @@ -1293,8 +1294,7 @@ class ReadableByteStreamController { firstDescriptor.bytesFilled = 0; } - this._queue = []; - this._totalQueuedBytes = 0; + ResetQueue(this); return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]); } @@ -1304,9 +1304,9 @@ class ReadableByteStreamController { assert(ReadableStreamHasDefaultReader(stream) === true); if (ReadableStreamGetNumReadRequests(stream) === 0) { - if (this._totalQueuedBytes > 0) { + if (this._queueTotalSize > 0) { const entry = this._queue.shift(); - this._totalQueuedBytes -= entry.byteLength; + this._queueTotalSize -= entry.byteLength; ReadableByteStreamControllerHandleQueueDrain(this); @@ -1452,7 +1452,7 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) { controller._queue.push({ buffer, byteOffset, byteLength }); - controller._totalQueuedBytes += byteLength; + controller._queueTotalSize += byteLength; } function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) { @@ -1460,7 +1460,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize; - const maxBytesToCopy = Math.min(controller._totalQueuedBytes, + const maxBytesToCopy = Math.min(controller._queueTotalSize, pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled); const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize; @@ -1488,7 +1488,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; } - controller._totalQueuedBytes -= bytesToCopy; + controller._queueTotalSize -= bytesToCopy; ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor); @@ -1496,7 +1496,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, } if (ready === false) { - assert(controller._totalQueuedBytes === 0, 'queue must be empty'); + assert(controller._queueTotalSize === 0, 'queue must be empty'); assert(pullIntoDescriptor.bytesFilled > 0); assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize); } @@ -1514,7 +1514,7 @@ function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size function ReadableByteStreamControllerHandleQueueDrain(controller) { assert(controller._controlledReadableStream._state === 'readable'); - if (controller._totalQueuedBytes === 0 && controller._closeRequested === true) { + if (controller._queueTotalSize === 0 && controller._closeRequested === true) { ReadableStreamClose(controller._controlledReadableStream); } else { ReadableByteStreamControllerCallPullIfNeeded(controller); @@ -1535,7 +1535,7 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro assert(controller._closeRequested === false); while (controller._pendingPullIntos.length > 0) { - if (controller._totalQueuedBytes === 0) { + if (controller._queueTotalSize === 0) { return; } @@ -1585,7 +1585,7 @@ function ReadableByteStreamControllerPullInto(controller, view) { return Promise.resolve(CreateIterResultObject(emptyView, true)); } - if (controller._totalQueuedBytes > 0) { + if (controller._queueTotalSize > 0) { if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) { const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor); @@ -1716,7 +1716,7 @@ function ReadableByteStreamControllerClose(controller) { assert(controller._closeRequested === false); assert(stream._state === 'readable'); - if (controller._totalQueuedBytes > 0) { + if (controller._queueTotalSize > 0) { controller._closeRequested = true; return; @@ -1772,13 +1772,12 @@ function ReadableByteStreamControllerError(controller, e) { ReadableByteStreamControllerClearPendingPullIntos(controller); - controller._queue = []; - + ResetQueue(controller); ReadableStreamError(stream, e); } function ReadableByteStreamControllerGetDesiredSize(controller) { - return controller._strategyHWM - controller._totalQueuedBytes; + return controller._strategyHWM - controller._queueTotalSize; } function ReadableByteStreamControllerRespond(controller, bytesWritten) { diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index d32a5fa8e..9468fbf57 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -3,7 +3,7 @@ const assert = require('assert'); const { InvokeOrNoop, PromiseInvokeOrNoop, ValidateAndNormalizeQueuingStrategy, typeIsObject } = require('./helpers.js'); const { rethrowAssertionErrorRejection } = require('./utils.js'); -const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize, PeekQueueValue } = require('./queue-with-sizes.js'); +const { DequeueValue, EnqueueValueWithSize, PeekQueueValue, ResetQueue } = require('./queue-with-sizes.js'); class WritableStream { constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { @@ -528,7 +528,10 @@ class WritableStreamDefaultController { this._underlyingSink = underlyingSink; - this._queue = []; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); + this._started = false; this._writing = false; this._inClose = false; @@ -575,20 +578,18 @@ class WritableStreamDefaultController { // Abstract operations implementing interface required by the WritableStream. function WritableStreamDefaultControllerAbort(controller, reason) { - controller._queue = []; - + ResetQueue(controller); const sinkAbortPromise = PromiseInvokeOrNoop(controller._underlyingSink, 'abort', [reason]); return sinkAbortPromise.then(() => undefined); } function WritableStreamDefaultControllerClose(controller) { - EnqueueValueWithSize(controller._queue, 'close', 0); + EnqueueValueWithSize(controller, 'close', 0); WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } function WritableStreamDefaultControllerGetDesiredSize(controller) { - const queueSize = GetTotalQueueSize(controller._queue); - return controller._strategyHWM - queueSize; + return controller._strategyHWM - controller._queueTotalSize; } function WritableStreamDefaultControllerWrite(controller, chunk) { @@ -614,7 +615,7 @@ function WritableStreamDefaultControllerWrite(controller, chunk) { const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); try { - EnqueueValueWithSize(controller._queue, writeRecord, chunkSize); + EnqueueValueWithSize(controller, writeRecord, chunkSize); } catch (enqueueE) { WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); return; @@ -662,7 +663,7 @@ function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { return; } - const writeRecord = PeekQueueValue(controller._queue); + const writeRecord = PeekQueueValue(controller); if (writeRecord === 'close') { WritableStreamDefaultControllerProcessClose(controller); } else { @@ -682,7 +683,7 @@ function WritableStreamDefaultControllerProcessClose(controller) { assert(stream._state === 'closing', 'can\'t process final write record unless already closed'); - DequeueValue(controller._queue); + DequeueValue(controller); assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued'); controller._inClose = true; @@ -743,7 +744,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { return; } const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); - DequeueValue(controller._queue); + DequeueValue(controller); if (state !== 'closing') { const backpressure = WritableStreamDefaultControllerGetBackpressure(controller); if (lastBackpressure !== backpressure) { @@ -787,8 +788,7 @@ function WritableStreamDefaultControllerError(controller, e) { assert(stream._state === 'writable' || stream._state === 'closing'); WritableStreamError(stream, e); - - controller._queue = []; + ResetQueue(controller); } // Helper functions for the WritableStream. diff --git a/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html new file mode 100644 index 000000000..fc91ce597 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js new file mode 100644 index 000000000..ab0caef46 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js @@ -0,0 +1,86 @@ +'use strict'; + +if (self.importScripts) { + self.importScripts('/resources/testharness.js'); +} + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(2); + assert_equals(controller.desiredSize, 0 - 2, 'desiredSize must be -2 after enqueueing such a chunk'); + + controller.enqueue(Number.MAX_SAFE_INTEGER); + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2 + 2, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2 + 2 + Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near NUMBER.MAX_SAFE_INTEGER (total ends up positive)'); + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(1e-16); + assert_equals(controller.desiredSize, 0 - 1e-16, 'desiredSize must be -1e16 after enqueueing such a chunk'); + + controller.enqueue(1); + assert_equals(controller.desiredSize, 0 - 1e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - 1e-16 - 1 + 1e-16, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - 1e-16 - 1 + 1e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up positive)'); + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(2e-16); + assert_equals(controller.desiredSize, 0 - 2e-16, 'desiredSize must be -2e16 after enqueueing such a chunk'); + + controller.enqueue(1); + assert_equals(controller.desiredSize, 0 - 2e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - 2e-16 - 1 + 2e-16, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - 2e-16 - 1 + 2e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up zero)'); + +function setupTestStream() { + const strategy = { + size(x) { + return x; + }, + highWaterMark: 0 + }; + + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + } + }, strategy); + + return { reader: rs.getReader(), controller }; +} diff --git a/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html new file mode 100644 index 000000000..fc91ce597 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js new file mode 100644 index 000000000..d7ed25976 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js @@ -0,0 +1,69 @@ +'use strict'; + +if (self.importScripts) { + self.importScripts('/resources/testharness.js'); +} + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(2), + writer.write(Number.MAX_SAFE_INTEGER) + ]; + + assert_equals(writer.desiredSize, 0 - 2 - Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 2 - Number.MAX_SAFE_INTEGER + 2 + Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near NUMBER.MAX_SAFE_INTEGER (total ends up positive)'); + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(1e-16), + writer.write(1) + ]; + + assert_equals(writer.desiredSize, 0 - 1e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 1e-16 - 1 + 1e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up positive)'); + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(2e-16), + writer.write(1) + ]; + + assert_equals(writer.desiredSize, 0 - 2e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 2e-16 - 1 + 2e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up zero)'); + +function setupTestStream() { + const strategy = { + size(x) { + return x; + }, + highWaterMark: 0 + }; + + const ws = new WritableStream({}, strategy); + + return ws.getWriter(); +}