diff --git a/index.bs b/index.bs index a75dfbae5..dc1facf2b 100644 --- a/index.bs +++ b/index.bs @@ -30,6 +30,7 @@ urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface text: DataView; url: #sec-dataview-objects; type: interface + text: Number; url: #sec-ecmascript-language-types-number-type; type: interface text: Uint8Array; url: #sec-typedarray-objects; type: interface text: typed array; url: #sec-typedarray-objects; type: dfn text: the typed array constructors table; url: #table-49; type: dfn @@ -1467,6 +1468,10 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size of all the chunks stored in \[[queue]] (see [[#queue-with-sizes]]) + \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -1503,7 +1508,7 @@ ReadableStreamDefaultController(stream, underlyingSource, 1. If _stream_.[[readableStreamController]] is not *undefined*, throw a *TypeError* exception. 1. Set *this*.[[controlledReadableStream]] to _stream_. 1. Set *this*.[[underlyingSource]] to _underlyingSource_. - 1. Set *this*.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(*this*). 1. Set *this*.[[started]], *this*.[[closeRequested]], *this*.[[pullAgain]], and *this*.[[pulling]] to *false*. 1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_). 1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to @@ -1586,7 +1591,7 @@ polymorphic dispatch from the readable stream implementation to either these or
\[[Cancel]](reason)
- 1. Set *this*.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(*this*). 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingSource]], `"cancel"`, « _reason_ ») @@ -1595,7 +1600,7 @@ polymorphic dispatch from the readable stream implementation to either these or 1. Let _stream_ be *this*.[[controlledReadableStream]]. 1. If *this*.[[queue]] is not empty, - 1. Let _chunk_ be ! DequeueValue(*this*.[[queue]]). + 1. Let _chunk_ be ! DequeueValue(*this*). 1. If *this*.[[closeRequested]] is *true* and *this*.[[queue]] is empty, perform ! ReadableStreamClose(_stream_). 1. Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(*this*). 1. Return a promise resolved with ! CreateIterResultObject(_chunk_, *false*). @@ -1688,7 +1693,7 @@ asserts). 1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _chunkSize_.[[Value]]). 1. Return _chunkSize_. 1. Let _chunkSize_ be _chunkSize_.[[Value]]. - 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_.[[queue]], _chunk_, _chunkSize_). + 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_, _chunk_, _chunkSize_). 1. If _enqueueResult_ is an abrupt completion, 1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _enqueueResult_.[[Value]]). 1. Return _enqueueResult_. @@ -1714,7 +1719,7 @@ an assert). 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _stream_.[[state]] is `"readable"`. - 1. Set _controller_.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Perform ! ReadableStreamError(_stream_, _e_). @@ -1735,8 +1740,7 @@ the {{ReadableStreamDefaultController/desiredSize}} property of the stream's ass Specifications should not use this on streams they did not create. - 1. Let _queueSize_ be ! GetTotalQueueSize(_controller_.[[queue]]). - 1. Return _controller_.[[strategyHWM]] − _queueSize_. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

Class @@ -1814,6 +1818,10 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size (in bytes) of all the chunks stored in \[[queue]] + \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -1823,10 +1831,6 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot A number supplied to the constructor as part of the stream's queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source - - \[[totalQueuedBytes]] - The number of bytes stored in \[[queue]] - \[[underlyingByteSource]] An object representation of the stream's underlying byte source; @@ -1834,6 +1838,14 @@ Instances of {{ReadableByteStreamController}} are created with the internal slot +
+

Although {{ReadableByteStreamController}} instances have \[[queue]] and \[[queueTotalSize]] slots, we do not use + most of the abstract operations in [[#queue-with-sizes]] on them, as the way in which we manipulate this queue is + rather different than the others in the spec. Instead, we update the two slots together manually.

+ +

This might be cleaned up in a future spec refactoring.

+
+

new ReadableByteStreamController(stream, underlyingByteSource, highWaterMark)

@@ -1850,9 +1862,8 @@ ReadableByteStreamController(stream, underlyingByteSource, 1. Set *this*.[[underlyingByteSource]] to _underlyingByteSource_. 1. Set *this*.[[pullAgain]], and *this*.[[pulling]] to *false*. 1. Perform ! ReadableByteStreamControllerClearPendingPullIntos(*this*). - 1. Set *this*.[[queue]] to a new empty List. - 1. Set *this*.[[totalQueuedBytes]] to *0*. - 1. Set *this*.[[started]], and *this*.[[closeRequested]] to *false*. + 1. Perform ! ResetQueue(*this*). + 1. Set *this*.[[started]] and *this*.[[closeRequested]] to *false*. 1. Set *this*.[[strategyHWM]] to ? ValidateAndNormalizeHighWaterMark(_highWaterMark_). 1. Let _autoAllocateChunkSize_ be ? GetV(_underlyingByteSource_, `"autoAllocateChunkSize"`). 1. If _autoAllocateChunkSize_ is not *undefined*, @@ -1958,8 +1969,7 @@ dispatch from the readable stream implementation to either these or their counte 1. If *this*.[[pendingPullIntos]] is not empty, 1. Let _firstDescriptor_ be the first element of *this*.[[pendingPullIntos]]. 1. Set _firstDescriptor_.[[bytesFilled]] to *0*. - 1. Set *this*.[[queue]] to a new empty List. - 1. Set *this*.[[totalQueuedBytes]] to *0*. + 1. Perform ! ResetQueue(*this*). 1. Return ! PromiseInvokeOrNoop(*this*.[[underlyingByteSource]], `"cancel"`, « _reason_ »)
@@ -1969,11 +1979,11 @@ dispatch from the readable stream implementation to either these or their counte 1. Let _stream_ be *this*.[[controlledReadableStream]]. 1. Assert: ! ReadableStreamHasDefaultReader(_stream_) is *true*. 1. If ! ReadableStreamGetNumReadRequests(_stream_) is *0*, - 1. If *this*.[[totalQueuedBytes]] > *0*, + 1. If *this*.[[queueTotalSize]] > *0*, 1. Let _entry_ be the first element of *this*.[[queue]]. 1. Remove _entry_ from *this*.[[queue]], shifting all other elements downward (so that the second becomes the first, and so on). - 1. Set *this*.[[totalQueuedBytes]] to *this*.[[totalQueuedBytes]] − _entry_.[[byteLength]]. + 1. Set *this*.[[queueTotalSize]] to *this*.[[queueTotalSize]] − _entry_.[[byteLength]]. 1. Perform ! ReadableByteStreamControllerHandleQueueDrain(*this*). 1. Let _view_ be ! Construct(%Uint8Array%, « _entry_.[[buffer]], _entry_.[[byteOffset]], _entry_.[[byteLength]] »). @@ -2133,7 +2143,7 @@ throws>ReadableByteStreamControllerClose ( controller ) 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _controller_.[[closeRequested]] is *false*. 1. Assert: _stream_.[[state]] is `"readable"`. - 1. If _controller_.[[totalQueuedBytes]] > *0*, + 1. If _controller_.[[queueTotalSize]] > *0*, 1. Set _controller_.[[closeRequested]] to *true*. 1. Return. 1. If _controller_.[[pendingPullIntos]] is not empty, @@ -2213,7 +2223,7 @@ nothrow>ReadableByteStreamControllerEnqueueChunkToQueue ( controller, 1. Append Record {[[buffer]]: _buffer_, [[byteOffset]]: _byteOffset_, [[byteLength]]: _byteLength_} as the last element of _controller_.[[queue]]. - 1. Add _byteLength_ to _controller_.[[totalQueuedBytes]]. + 1. Add _byteLength_ to _controller_.[[queueTotalSize]].

ReadableByteStreamControllerError ( controller, e 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _stream_.[[state]] is `"readable"`. 1. Perform ! ReadableByteStreamControllerClearPendingPullIntos(_controller_). - 1. Let _controller_.[[queue]] be a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Perform ! ReadableStreamError(_stream_, _e_). @@ -2248,7 +2258,7 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr 1. Let _elementSize_ be _pullIntoDescriptor_.[[elementSize]]. 1. Let _currentAlignedBytes_ be _pullIntoDescriptor_.[[bytesFilled]] − (_pullIntoDescriptor_.[[bytesFilled]] mod _elementSize_). - 1. Let _maxBytesToCopy_ be min(_controller_.[[totalQueuedBytes]], _pullIntoDescriptor_.[[byteLength]] − + 1. Let _maxBytesToCopy_ be min(_controller_.[[queueTotalSize]], _pullIntoDescriptor_.[[byteLength]] − _pullIntoDescriptor_.[[bytesFilled]]). 1. Let _maxBytesFilled_ be _pullIntoDescriptor_.[[bytesFilled]] + _maxBytesToCopy_. 1. Let _maxAlignedBytes_ be _maxBytesFilled_ − (_maxBytesFilled_ mod _elementSize_). @@ -2270,12 +2280,12 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr 1. Otherwise, 1. Set _headOfQueue_.[[byteOffset]] to _headOfQueue_.[[byteOffset]] + _bytesToCopy_. 1. Set _headOfQueue_.[[byteLength]] to _headOfQueue_.[[byteLength]] − _bytesToCopy_. - 1. Set _controller_.[[totalQueuedBytes]] to _controller_.[[totalQueuedBytes]] − _bytesToCopy_. + 1. Set _controller_.[[queueTotalSize]] to _controller_.[[queueTotalSize]] − _bytesToCopy_. 1. Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(_controller_, _bytesToCopy_, _pullIntoDescriptor_). 1. Set _totalBytesToCopyRemaining_ to _totalBytesToCopyRemaining_ − _bytesToCopy_. 1. If _ready_ is *false*, - 1. Assert: _controller_.[[totalQueuedBytes]] is *0*. + 1. Assert: _controller_.[[queueTotalSize]] is *0*. 1. Assert: _pullIntoDescriptor_.[[bytesFilled]] > *0*. 1. Assert: _pullIntoDescriptor_.[[bytesFilled]] < _pullIntoDescriptor_.[[elementSize]]. 1. Return _ready_. @@ -2285,7 +2295,7 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( contr nothrow>ReadableByteStreamControllerGetDesiredSize ( controller )

- 1. Return _controller_.[[strategyHWM]] − _controller_.[[totalQueuedBytes]]. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

ReadableByteStreamControllerHandleQueueDrain ( controller ) 1. Assert: _controller_.[[controlledReadableStream]].[[state]] is `"readable"`. - 1. If _controller_.[[totalQueuedBytes]] is *0* and _controller_.[[closeRequested]] is *true*, + 1. If _controller_.[[queueTotalSize]] is *0* and _controller_.[[closeRequested]] is *true*, 1. Perform ! ReadableStreamClose(_controller_.[[controlledReadableStream]]). 1. Otherwise, 1. Perform ! ReadableByteStreamControllerCallPullIfNeeded(_controller_). @@ -2317,7 +2327,7 @@ nothrow>ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( 1. Assert: _controller_.[[closeRequested]] is *false*. 1. Repeat the following steps while _controller_.[[pendingPullIntos]] is not empty, - 1. If _controller_.[[totalQueuedBytes]] is *0*, return. + 1. If _controller_.[[queueTotalSize]] is *0*, return. 1. Let _pullIntoDescriptor_ be the first element of _controller_.[[pendingPullIntos]]. 1. If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(_controller_, _pullIntoDescriptor_) is *true*, 1. Perform ! ReadableByteStreamControllerShiftPendingPullInto(_controller_). @@ -2348,7 +2358,7 @@ nothrow>ReadableByteStreamControllerPullInto ( controller, view< 1. If _stream_.[[state]] is `"closed"`, 1. Let _emptyView_ be ! Construct(_ctor_, « _pullIntoDescriptor_.[[buffer]], _pullIntoDescriptor_.[[byteOffset]], *0* »). 1. Return a promise resolved with ! CreateIterResultObject(_emptyView_, *true*). - 1. If _controller_.[[totalQueuedBytes]] > *0*, + 1. If _controller_.[[queueTotalSize]] > *0*, 1. If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(_controller_, _pullIntoDescriptor_) is *true*, 1. Let _filledView_ be ! ReadableByteStreamControllerConvertPullIntoDescriptor(_pullIntoDescriptor_). 1. Perform ! ReadableByteStreamControllerHandleQueueDrain(_controller_). @@ -3228,6 +3238,10 @@ Instances of {{WritableStreamDefaultController}} are created with the internal s \[[queue]] A List representing the stream's internal queue of chunks + + \[[queueTotalSize]] + The total size of all the chunks stored in \[[queue]] (see [[#queue-with-sizes]]) + \[[started]] A boolean flag indicating whether the underlying sink has finished starting @@ -3269,8 +3283,8 @@ WritableStreamDefaultController(stream, underlyingSink, IsWritableStreamDefaultController ( x )

nothrow>WritableStreamDefaultControllerAbort ( controller, reason ) - 1. Set _controller_.[[queue]] to a new empty List. + 1. Perform ! ResetQueue(_controller_). 1. Let _sinkAbortPromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"abort"`, « _reason_ »). 1. Return the result of transforming _sinkAbortPromise_ by a fulfillment handler that returns @@ -3332,7 +3346,7 @@ nothrow>WritableStreamDefaultControllerAbort ( controller, reaso nothrow>WritableStreamDefaultControllerClose ( controller ) - 1. Perform ! EnqueueValueWithSize(_controller_.[[queue]], `"close"`, *0*). + 1. Perform ! EnqueueValueWithSize(_controller_, `"close"`, *0*). 1. Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(_controller_). @@ -3340,8 +3354,7 @@ nothrow>WritableStreamDefaultControllerClose ( controller ) nothrow>WritableStreamDefaultControllerGetDesiredSize ( controller ) - 1. Let _queueSize_ be ! GetTotalQueueSize(_controller_.[[queue]]). - 1. Return _controller_.[[strategyHWM]] − _queueSize_. + 1. Return _controller_.[[strategyHWM]] − _controller_.[[queueTotalSize]].

WritableStreamDefaultControllerWrite ( controller, chunk 1. Return. 1. Let _writeRecord_ be Record {[[chunk]]: _chunk_}. 1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_). - 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_.[[queue]], _writeRecord_, _chunkSize_). + 1. Let _enqueueResult_ be ! EnqueueValueWithSize(_controller_, _writeRecord_, _chunkSize_). 1. If _enqueueResult_ is an abrupt completion, 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _enqueueResult_.[[Value]]). 1. Return. @@ -3378,7 +3391,7 @@ aoid="WritableStreamDefaultControllerAdvanceQueueIfNeeded" nothrow>WritableStrea 1. If _controller_.[[started]] is *false*, return. 1. If _controller_.[[writing]] is *true*, return. 1. If _controller_.[[queue]] is empty, return. - 1. Let _writeRecord_ be ! PeekQueueValue(_controller_.[[queue]]). + 1. Let _writeRecord_ be ! PeekQueueValue(_controller_). 1. If _writeRecord_ is `"close"`, perform WritableStreamDefaultControllerProcessClose(_controller_). 1. Otherwise, perform WritableStreamDefaultControllerProcessWrite(_controller_, _writeRecord_.[[chunk]]). @@ -3397,7 +3410,7 @@ nothrow>WritableStreamDefaultControllerProcessClose ( controller ) 1. Let _stream_ be _controller_.[[controlledWritableStream]]. 1. Assert: _stream_.[[state]] is `"closing"`. - 1. Perform ! DequeueValue(_controller_.[[queue]]). + 1. Perform ! DequeueValue(_controller_). 1. Assert: _controller_.[[queue]] is empty. 1. Set _controller_.[[inClose]] to *true*. 1. Let _sinkClosePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"close"`, « _controller_ »). @@ -3449,7 +3462,7 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, WritableStreamDefaultControllerError ( controller, e

Transform Streams

@@ -3651,46 +3664,55 @@ CountQueuingStrategy({ highWaterMark })

Queue-with-Sizes Operations

The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their -determined sizes. A queue-with-sizes is a List of Records with \[[value]] and \[[size]] fields (although in -implementations it would of course be backed by a more efficient data structure). +determined sizes. Various specification objects contain a queue-with-sizes, represented by the object having two paired +internal slots, always named \[[queue]] and \[[queueTotalSize]]. \[[queue]] is a List of Records with \[[value]] and +\[[size]] fields, and \[[queueTotalSize]] is a JavaScript {{Number}}, i.e. a double-precision floating point number. -A number of abstract operations are specified here to make working with queues-with-sizes more pleasant, and used -throughout the rest of this standard. +The following abstract operations are used when operating on objects that contain queues-with-sizes, in order to ensure +that the two internal slots stay synchronized. -

DequeueValue ( queue )

+

Due to the vagaries of floating point arithmetic, the framework specified here, of keeping a running +total in the \[[queueTotalSize]] slot, is not equivalent to adding up the size of all chunks in +\[[queue]]. This is most apparent when the total size is very small or very large.

+ +

DequeueValue ( container )

- 1. Assert: _queue_ is not empty. - 1. Let _pair_ be the first element of _queue_. - 1. Remove _pair_ from _queue_, shifting all other elements downward (so that the second becomes the first, and so on). + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Assert: _container_.[[queue]] is not empty. + 1. Let _pair_ be the first element of _container_.[[queue]]. + 1. Remove _pair_ from _container_.[[queue]], shifting all other elements downward (so that the second becomes the + first, and so on). + 1. Set _container_.[[queueTotalSize]] to _container_.[[queueTotalSize]] − _pair_.[[size]]. 1. Return _pair_.[[value]]. -

EnqueueValueWithSize ( queue, +

EnqueueValueWithSize ( container, value, size )

+ 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. 1. Let _size_ be ? ToNumber(_size_). 1. If ! IsFiniteNonNegativeNumber(_size_) is *false*, throw a *RangeError* exception. - 1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _queue_. + 1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _container_.[[queue]]. + 1. Set _container_.[[queueTotalSize]] to _container_.[[queueTotalSize]] + _size_. -

GetTotalQueueSize ( queue )

+

PeekQueueValue ( container )

- 1. Let _totalSize_ be *0*. - 1. Repeat for each Record {[[value]], [[size]]} _pair_ that is an element of _queue_, - 1. Assert: _pair_.[[size]] is a finite, non-*NaN* number. - 1. Set _totalSize_ to _totalSize_ + _pair_.[[size]]. - 1. Return _totalSize_. + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Assert: _container_.[[queue]] is not empty. + 1. Let _pair_ be the first element of _container_.[[queue]]. + 1. Return _pair_.[[value]]. -

PeekQueueValue ( queue )

+

ResetQueue ( container )

- 1. Assert: _queue_ is not empty. - 1. Let _pair_ be the first element of _queue_. - 1. Return _pair_.[[value]]. + 1. Assert: _container_ has [[queue]] and [[queueTotalSize]] internal slots. + 1. Set _container_.[[queue]] to a new empty List. + 1. Set _container_.[[queueTotalSize]] to *0*.

Miscellaneous Operations

@@ -4255,6 +4277,9 @@ itself will evolve in these ways. ECMAScript spec does. +It's also worth noting that, as in [[!ECMASCRIPT]], all numbers are represented as double-precision floating point +values, and all arithmetic operations performed on them must be done in the usual way for such values. +

Acknowledgments

The editor would like to thank diff --git a/reference-implementation/lib/queue-with-sizes.js b/reference-implementation/lib/queue-with-sizes.js index 9ded23094..56ba4d3f5 100644 --- a/reference-implementation/lib/queue-with-sizes.js +++ b/reference-implementation/lib/queue-with-sizes.js @@ -2,39 +2,43 @@ const assert = require('assert'); const { IsFiniteNonNegativeNumber } = require('./helpers.js'); -exports.DequeueValue = queue => { - assert(queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.'); - const pair = queue.shift(); +exports.DequeueValue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + assert(container._queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.'); - queue._totalSize -= pair.size; + const pair = container._queue.shift(); + container._queueTotalSize -= pair.size; return pair.value; }; -exports.EnqueueValueWithSize = (queue, value, size) => { +exports.EnqueueValueWithSize = (container, value, size) => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + size = Number(size); if (!IsFiniteNonNegativeNumber(size)) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } - queue.push({ value, size }); - - if (queue._totalSize === undefined) { - queue._totalSize = 0; - } - queue._totalSize += size; + container._queue.push({ value, size }); + container._queueTotalSize += size; }; -// This implementation is not per-spec. Total size is cached for speed. -exports.GetTotalQueueSize = queue => { - if (queue._totalSize === undefined) { - queue._totalSize = 0; - } - return queue._totalSize; -}; +exports.PeekQueueValue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + assert(container._queue.length > 0, 'Spec-level failure: should never peek at an empty queue.'); -exports.PeekQueueValue = queue => { - assert(queue.length > 0, 'Spec-level failure: should never peek at an empty queue.'); - const pair = queue[0]; + const pair = container._queue[0]; return pair.value; }; + +exports.ResetQueue = container => { + assert('_queue' in container && '_queueTotalSize' in container, + 'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].'); + + container._queue = []; + container._queueTotalSize = 0; +}; diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 5b47502b1..5368c5e00 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -5,7 +5,7 @@ const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, Invo require('./helpers.js'); const { createArrayFromList, createDataProperty, typeIsObject } = require('./helpers.js'); const { rethrowAssertionErrorRejection } = require('./utils.js'); -const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } = require('./queue-with-sizes.js'); +const { DequeueValue, EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite } = require('./writable-stream.js'); @@ -853,7 +853,10 @@ class ReadableStreamDefaultController { this._underlyingSource = underlyingSource; - this._queue = []; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); + this._started = false; this._closeRequested = false; this._pullAgain = false; @@ -938,8 +941,7 @@ class ReadableStreamDefaultController { } [InternalCancel](reason) { - this._queue = []; - + ResetQueue(this); return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]); } @@ -947,7 +949,7 @@ class ReadableStreamDefaultController { const stream = this._controlledReadableStream; if (this._queue.length > 0) { - const chunk = DequeueValue(this._queue); + const chunk = DequeueValue(this); if (this._closeRequested === true && this._queue.length === 0) { ReadableStreamClose(stream); @@ -1077,7 +1079,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { } try { - EnqueueValueWithSize(controller._queue, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize); } catch (enqueueE) { ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); throw enqueueE; @@ -1094,7 +1096,7 @@ function ReadableStreamDefaultControllerError(controller, e) { assert(stream._state === 'readable'); - controller._queue = []; + ResetQueue(controller); ReadableStreamError(stream, e); } @@ -1106,8 +1108,7 @@ function ReadableStreamDefaultControllerErrorIfNeeded(controller, e) { } function ReadableStreamDefaultControllerGetDesiredSize(controller) { - const queueSize = GetTotalQueueSize(controller._queue); - return controller._strategyHWM - queueSize; + return controller._strategyHWM - controller._queueTotalSize; } class ReadableStreamBYOBRequest { @@ -1171,11 +1172,11 @@ class ReadableByteStreamController { ReadableByteStreamControllerClearPendingPullIntos(this); - this._queue = []; - this._totalQueuedBytes = 0; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); this._closeRequested = false; - this._started = false; this._strategyHWM = ValidateAndNormalizeHighWaterMark(highWaterMark); @@ -1293,8 +1294,7 @@ class ReadableByteStreamController { firstDescriptor.bytesFilled = 0; } - this._queue = []; - this._totalQueuedBytes = 0; + ResetQueue(this); return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]); } @@ -1304,9 +1304,9 @@ class ReadableByteStreamController { assert(ReadableStreamHasDefaultReader(stream) === true); if (ReadableStreamGetNumReadRequests(stream) === 0) { - if (this._totalQueuedBytes > 0) { + if (this._queueTotalSize > 0) { const entry = this._queue.shift(); - this._totalQueuedBytes -= entry.byteLength; + this._queueTotalSize -= entry.byteLength; ReadableByteStreamControllerHandleQueueDrain(this); @@ -1452,7 +1452,7 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) { controller._queue.push({ buffer, byteOffset, byteLength }); - controller._totalQueuedBytes += byteLength; + controller._queueTotalSize += byteLength; } function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) { @@ -1460,7 +1460,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize; - const maxBytesToCopy = Math.min(controller._totalQueuedBytes, + const maxBytesToCopy = Math.min(controller._queueTotalSize, pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled); const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize; @@ -1488,7 +1488,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; } - controller._totalQueuedBytes -= bytesToCopy; + controller._queueTotalSize -= bytesToCopy; ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor); @@ -1496,7 +1496,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, } if (ready === false) { - assert(controller._totalQueuedBytes === 0, 'queue must be empty'); + assert(controller._queueTotalSize === 0, 'queue must be empty'); assert(pullIntoDescriptor.bytesFilled > 0); assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize); } @@ -1514,7 +1514,7 @@ function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size function ReadableByteStreamControllerHandleQueueDrain(controller) { assert(controller._controlledReadableStream._state === 'readable'); - if (controller._totalQueuedBytes === 0 && controller._closeRequested === true) { + if (controller._queueTotalSize === 0 && controller._closeRequested === true) { ReadableStreamClose(controller._controlledReadableStream); } else { ReadableByteStreamControllerCallPullIfNeeded(controller); @@ -1535,7 +1535,7 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro assert(controller._closeRequested === false); while (controller._pendingPullIntos.length > 0) { - if (controller._totalQueuedBytes === 0) { + if (controller._queueTotalSize === 0) { return; } @@ -1585,7 +1585,7 @@ function ReadableByteStreamControllerPullInto(controller, view) { return Promise.resolve(CreateIterResultObject(emptyView, true)); } - if (controller._totalQueuedBytes > 0) { + if (controller._queueTotalSize > 0) { if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) { const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor); @@ -1716,7 +1716,7 @@ function ReadableByteStreamControllerClose(controller) { assert(controller._closeRequested === false); assert(stream._state === 'readable'); - if (controller._totalQueuedBytes > 0) { + if (controller._queueTotalSize > 0) { controller._closeRequested = true; return; @@ -1772,13 +1772,12 @@ function ReadableByteStreamControllerError(controller, e) { ReadableByteStreamControllerClearPendingPullIntos(controller); - controller._queue = []; - + ResetQueue(controller); ReadableStreamError(stream, e); } function ReadableByteStreamControllerGetDesiredSize(controller) { - return controller._strategyHWM - controller._totalQueuedBytes; + return controller._strategyHWM - controller._queueTotalSize; } function ReadableByteStreamControllerRespond(controller, bytesWritten) { diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index d32a5fa8e..9468fbf57 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -3,7 +3,7 @@ const assert = require('assert'); const { InvokeOrNoop, PromiseInvokeOrNoop, ValidateAndNormalizeQueuingStrategy, typeIsObject } = require('./helpers.js'); const { rethrowAssertionErrorRejection } = require('./utils.js'); -const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize, PeekQueueValue } = require('./queue-with-sizes.js'); +const { DequeueValue, EnqueueValueWithSize, PeekQueueValue, ResetQueue } = require('./queue-with-sizes.js'); class WritableStream { constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { @@ -528,7 +528,10 @@ class WritableStreamDefaultController { this._underlyingSink = underlyingSink; - this._queue = []; + // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly. + this._queue = this._queueTotalSize = undefined; + ResetQueue(this); + this._started = false; this._writing = false; this._inClose = false; @@ -575,20 +578,18 @@ class WritableStreamDefaultController { // Abstract operations implementing interface required by the WritableStream. function WritableStreamDefaultControllerAbort(controller, reason) { - controller._queue = []; - + ResetQueue(controller); const sinkAbortPromise = PromiseInvokeOrNoop(controller._underlyingSink, 'abort', [reason]); return sinkAbortPromise.then(() => undefined); } function WritableStreamDefaultControllerClose(controller) { - EnqueueValueWithSize(controller._queue, 'close', 0); + EnqueueValueWithSize(controller, 'close', 0); WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } function WritableStreamDefaultControllerGetDesiredSize(controller) { - const queueSize = GetTotalQueueSize(controller._queue); - return controller._strategyHWM - queueSize; + return controller._strategyHWM - controller._queueTotalSize; } function WritableStreamDefaultControllerWrite(controller, chunk) { @@ -614,7 +615,7 @@ function WritableStreamDefaultControllerWrite(controller, chunk) { const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); try { - EnqueueValueWithSize(controller._queue, writeRecord, chunkSize); + EnqueueValueWithSize(controller, writeRecord, chunkSize); } catch (enqueueE) { WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); return; @@ -662,7 +663,7 @@ function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { return; } - const writeRecord = PeekQueueValue(controller._queue); + const writeRecord = PeekQueueValue(controller); if (writeRecord === 'close') { WritableStreamDefaultControllerProcessClose(controller); } else { @@ -682,7 +683,7 @@ function WritableStreamDefaultControllerProcessClose(controller) { assert(stream._state === 'closing', 'can\'t process final write record unless already closed'); - DequeueValue(controller._queue); + DequeueValue(controller); assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued'); controller._inClose = true; @@ -743,7 +744,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { return; } const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); - DequeueValue(controller._queue); + DequeueValue(controller); if (state !== 'closing') { const backpressure = WritableStreamDefaultControllerGetBackpressure(controller); if (lastBackpressure !== backpressure) { @@ -787,8 +788,7 @@ function WritableStreamDefaultControllerError(controller, e) { assert(stream._state === 'writable' || stream._state === 'closing'); WritableStreamError(stream, e); - - controller._queue = []; + ResetQueue(controller); } // Helper functions for the WritableStream. diff --git a/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html new file mode 100644 index 000000000..fc91ce597 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.https.html @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js new file mode 100644 index 000000000..ab0caef46 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/readable-streams/floating-point-total-queue-size.js @@ -0,0 +1,86 @@ +'use strict'; + +if (self.importScripts) { + self.importScripts('/resources/testharness.js'); +} + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(2); + assert_equals(controller.desiredSize, 0 - 2, 'desiredSize must be -2 after enqueueing such a chunk'); + + controller.enqueue(Number.MAX_SAFE_INTEGER); + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2 + 2, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - Number.MAX_SAFE_INTEGER - 2 + 2 + Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near NUMBER.MAX_SAFE_INTEGER (total ends up positive)'); + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(1e-16); + assert_equals(controller.desiredSize, 0 - 1e-16, 'desiredSize must be -1e16 after enqueueing such a chunk'); + + controller.enqueue(1); + assert_equals(controller.desiredSize, 0 - 1e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - 1e-16 - 1 + 1e-16, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - 1e-16 - 1 + 1e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up positive)'); + +promise_test(() => { + const { reader, controller } = setupTestStream(); + + controller.enqueue(2e-16); + assert_equals(controller.desiredSize, 0 - 2e-16, 'desiredSize must be -2e16 after enqueueing such a chunk'); + + controller.enqueue(1); + assert_equals(controller.desiredSize, 0 - 2e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (adding a second chunk)'); + + return reader.read().then(() => { + assert_equals(controller.desiredSize, 0 - 2e-16 - 1 + 2e-16, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a chunk)'); + + return reader.read(); + }).then(() => { + assert_equals(controller.desiredSize, 0 - 2e-16 - 1 + 2e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (subtracting a second chunk)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up zero)'); + +function setupTestStream() { + const strategy = { + size(x) { + return x; + }, + highWaterMark: 0 + }; + + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + } + }, strategy); + + return { reader: rs.getReader(), controller }; +} diff --git a/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html new file mode 100644 index 000000000..fc91ce597 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.https.html @@ -0,0 +1,14 @@ + + + + + + + + + + + diff --git a/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js new file mode 100644 index 000000000..d7ed25976 --- /dev/null +++ b/reference-implementation/to-upstream-wpts/writable-streams/floating-point-total-queue-size.js @@ -0,0 +1,69 @@ +'use strict'; + +if (self.importScripts) { + self.importScripts('/resources/testharness.js'); +} + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(2), + writer.write(Number.MAX_SAFE_INTEGER) + ]; + + assert_equals(writer.desiredSize, 0 - 2 - Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 2 - Number.MAX_SAFE_INTEGER + 2 + Number.MAX_SAFE_INTEGER, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near NUMBER.MAX_SAFE_INTEGER (total ends up positive)'); + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(1e-16), + writer.write(1) + ]; + + assert_equals(writer.desiredSize, 0 - 1e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 1e-16 - 1 + 1e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up positive)'); + +promise_test(() => { + const writer = setupTestStream(); + + const writePromises = [ + writer.write(2e-16), + writer.write(1) + ]; + + assert_equals(writer.desiredSize, 0 - 2e-16 - 1, + 'desiredSize must be calculated using floating-point arithmetic (after writing two chunks)'); + + return Promise.all(writePromises).then(() => { + assert_equals(writer.desiredSize, 0 - 2e-16 - 1 + 2e-16 + 1, + 'desiredSize must be calculated using floating-point arithmetic (after the two chunks have finished writing)'); + }); +}, 'Floating point arithmetic must manifest near 0 (total ends up zero)'); + +function setupTestStream() { + const strategy = { + size(x) { + return x; + }, + highWaterMark: 0 + }; + + const ws = new WritableStream({}, strategy); + + return ws.getWriter(); +}