Skip to content

Commit

Permalink
Fix queue total size tracking logic
Browse files Browse the repository at this point in the history
Previously, we believed that the spec's inefficient-but-clear method, of adding up all the chunk sizes whenever it wanted to get the total queue size, was equivalent to keeping a running count. This was discussed in #582; note also #491 where we changed the reference implementation to the running-count strategy for speed reasons. As discovered in #582 (comment), and further elaborated on in #657, these two methods are not in fact equivalent, due to the vagaries of floating-point arithmetic. (That is, they are equivalent if you assume the addition is floating-point addition, but that is less than clear from the spec.)

As such, this commit switches the spec to the more efficient running-count method, as that's realistically the only implementable version. It also adds tests to ensure floating point arithmetic is being used, exposing such cases.

This commit also includes a few unobservable cleanups:

- It introduces the ResetQueue abstract operation, to ensure that the queue total size gets reset to zero when the queue is cleared. This should not matter because no code paths check the queue's total size after it has been cleared, but keeping the two slots in sync seems virtuous.

- It updates the internal slot name for ReadableByteStreamController instances from [[totalQueuedBytes]] to [[queueTotalSize]], to be consistent with those for the other queue-containers. We do not yet use the queue-with-sizes abstract operations (except ResetQueue) on ReadableByteStreamController instances as the queue management is significantly more complicated there. But aligning makes the spec easier to read.
  • Loading branch information
domenic committed Jan 18, 2017
1 parent f8ed321 commit b8c3455
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 120 deletions.
143 changes: 84 additions & 59 deletions index.bs

Large diffs are not rendered by default.

46 changes: 25 additions & 21 deletions reference-implementation/lib/queue-with-sizes.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,43 @@
const assert = require('assert');
const { IsFiniteNonNegativeNumber } = require('./helpers.js');

exports.DequeueValue = queue => {
assert(queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.');
const pair = queue.shift();
exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].');
assert(container._queue.length > 0, 'Spec-level failure: should never dequeue from an empty queue.');

queue._totalSize -= pair.size;
const pair = container._queue.shift();
container._queueTotalSize -= pair.size;

return pair.value;
};

exports.EnqueueValueWithSize = (queue, value, size) => {
exports.EnqueueValueWithSize = (container, value, size) => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].');

size = Number(size);
if (!IsFiniteNonNegativeNumber(size)) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

queue.push({ value, size });

if (queue._totalSize === undefined) {
queue._totalSize = 0;
}
queue._totalSize += size;
container._queue.push({ value, size });
container._queueTotalSize += size;
};

// This implementation is not per-spec. Total size is cached for speed.
exports.GetTotalQueueSize = queue => {
if (queue._totalSize === undefined) {
queue._totalSize = 0;
}
return queue._totalSize;
};
exports.PeekQueueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].');
assert(container._queue.length > 0, 'Spec-level failure: should never peek at an empty queue.');

exports.PeekQueueValue = queue => {
assert(queue.length > 0, 'Spec-level failure: should never peek at an empty queue.');
const pair = queue[0];
const pair = container._queue[0];
return pair.value;
};

exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].');

container._queue = [];
container._queueTotalSize = 0;
};
53 changes: 26 additions & 27 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, Invo
require('./helpers.js');
const { createArrayFromList, createDataProperty, typeIsObject } = require('./helpers.js');
const { rethrowAssertionErrorRejection } = require('./utils.js');
const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } = require('./queue-with-sizes.js');
const { DequeueValue, EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLocked,
WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation,
WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite } = require('./writable-stream.js');
Expand Down Expand Up @@ -853,7 +853,10 @@ class ReadableStreamDefaultController {

this._underlyingSource = underlyingSource;

this._queue = [];
// Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly.
this._queue = this._queueTotalSize = undefined;
ResetQueue(this);

this._started = false;
this._closeRequested = false;
this._pullAgain = false;
Expand Down Expand Up @@ -938,16 +941,15 @@ class ReadableStreamDefaultController {
}

[InternalCancel](reason) {
this._queue = [];

ResetQueue(this);
return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]);
}

[InternalPull]() {
const stream = this._controlledReadableStream;

if (this._queue.length > 0) {
const chunk = DequeueValue(this._queue);
const chunk = DequeueValue(this);

if (this._closeRequested === true && this._queue.length === 0) {
ReadableStreamClose(stream);
Expand Down Expand Up @@ -1077,7 +1079,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller._queue, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize);
} catch (enqueueE) {
ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
throw enqueueE;
Expand All @@ -1094,7 +1096,7 @@ function ReadableStreamDefaultControllerError(controller, e) {

assert(stream._state === 'readable');

controller._queue = [];
ResetQueue(controller);

ReadableStreamError(stream, e);
}
Expand All @@ -1106,8 +1108,7 @@ function ReadableStreamDefaultControllerErrorIfNeeded(controller, e) {
}

function ReadableStreamDefaultControllerGetDesiredSize(controller) {
const queueSize = GetTotalQueueSize(controller._queue);
return controller._strategyHWM - queueSize;
return controller._strategyHWM - controller._queueTotalSize;
}

class ReadableStreamBYOBRequest {
Expand Down Expand Up @@ -1171,11 +1172,11 @@ class ReadableByteStreamController {

ReadableByteStreamControllerClearPendingPullIntos(this);

this._queue = [];
this._totalQueuedBytes = 0;
// Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly.
this._queue = this._queueTotalSize = undefined;
ResetQueue(this);

this._closeRequested = false;

this._started = false;

this._strategyHWM = ValidateAndNormalizeHighWaterMark(highWaterMark);
Expand Down Expand Up @@ -1293,8 +1294,7 @@ class ReadableByteStreamController {
firstDescriptor.bytesFilled = 0;
}

this._queue = [];
this._totalQueuedBytes = 0;
ResetQueue(this);

return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]);
}
Expand All @@ -1304,9 +1304,9 @@ class ReadableByteStreamController {
assert(ReadableStreamHasDefaultReader(stream) === true);

if (ReadableStreamGetNumReadRequests(stream) === 0) {
if (this._totalQueuedBytes > 0) {
if (this._queueTotalSize > 0) {
const entry = this._queue.shift();
this._totalQueuedBytes -= entry.byteLength;
this._queueTotalSize -= entry.byteLength;

ReadableByteStreamControllerHandleQueueDrain(this);

Expand Down Expand Up @@ -1452,15 +1452,15 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto

function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) {
controller._queue.push({ buffer, byteOffset, byteLength });
controller._totalQueuedBytes += byteLength;
controller._queueTotalSize += byteLength;
}

function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) {
const elementSize = pullIntoDescriptor.elementSize;

const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize;

const maxBytesToCopy = Math.min(controller._totalQueuedBytes,
const maxBytesToCopy = Math.min(controller._queueTotalSize,
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize;
Expand Down Expand Up @@ -1488,15 +1488,15 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller,
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
}
controller._totalQueuedBytes -= bytesToCopy;
controller._queueTotalSize -= bytesToCopy;

ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor);

totalBytesToCopyRemaining -= bytesToCopy;
}

if (ready === false) {
assert(controller._totalQueuedBytes === 0, 'queue must be empty');
assert(controller._queueTotalSize === 0, 'queue must be empty');
assert(pullIntoDescriptor.bytesFilled > 0);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
}
Expand All @@ -1514,7 +1514,7 @@ function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size
function ReadableByteStreamControllerHandleQueueDrain(controller) {
assert(controller._controlledReadableStream._state === 'readable');

if (controller._totalQueuedBytes === 0 && controller._closeRequested === true) {
if (controller._queueTotalSize === 0 && controller._closeRequested === true) {
ReadableStreamClose(controller._controlledReadableStream);
} else {
ReadableByteStreamControllerCallPullIfNeeded(controller);
Expand All @@ -1535,7 +1535,7 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro
assert(controller._closeRequested === false);

while (controller._pendingPullIntos.length > 0) {
if (controller._totalQueuedBytes === 0) {
if (controller._queueTotalSize === 0) {
return;
}

Expand Down Expand Up @@ -1585,7 +1585,7 @@ function ReadableByteStreamControllerPullInto(controller, view) {
return Promise.resolve(CreateIterResultObject(emptyView, true));
}

if (controller._totalQueuedBytes > 0) {
if (controller._queueTotalSize > 0) {
if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) {
const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);

Expand Down Expand Up @@ -1716,7 +1716,7 @@ function ReadableByteStreamControllerClose(controller) {
assert(controller._closeRequested === false);
assert(stream._state === 'readable');

if (controller._totalQueuedBytes > 0) {
if (controller._queueTotalSize > 0) {
controller._closeRequested = true;

return;
Expand Down Expand Up @@ -1772,13 +1772,12 @@ function ReadableByteStreamControllerError(controller, e) {

ReadableByteStreamControllerClearPendingPullIntos(controller);

controller._queue = [];

ResetQueue(controller);
ReadableStreamError(stream, e);
}

function ReadableByteStreamControllerGetDesiredSize(controller) {
return controller._strategyHWM - controller._totalQueuedBytes;
return controller._strategyHWM - controller._queueTotalSize;
}

function ReadableByteStreamControllerRespond(controller, bytesWritten) {
Expand Down
26 changes: 13 additions & 13 deletions reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const assert = require('assert');
const { InvokeOrNoop, PromiseInvokeOrNoop, ValidateAndNormalizeQueuingStrategy, typeIsObject } =
require('./helpers.js');
const { rethrowAssertionErrorRejection } = require('./utils.js');
const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize, PeekQueueValue } = require('./queue-with-sizes.js');
const { DequeueValue, EnqueueValueWithSize, PeekQueueValue, ResetQueue } = require('./queue-with-sizes.js');

class WritableStream {
constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
Expand Down Expand Up @@ -528,7 +528,10 @@ class WritableStreamDefaultController {

this._underlyingSink = underlyingSink;

this._queue = [];
// Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly.
this._queue = this._queueTotalSize = undefined;
ResetQueue(this);

this._started = false;
this._writing = false;
this._inClose = false;
Expand Down Expand Up @@ -575,20 +578,18 @@ class WritableStreamDefaultController {
// Abstract operations implementing interface required by the WritableStream.

function WritableStreamDefaultControllerAbort(controller, reason) {
controller._queue = [];

ResetQueue(controller);
const sinkAbortPromise = PromiseInvokeOrNoop(controller._underlyingSink, 'abort', [reason]);
return sinkAbortPromise.then(() => undefined);
}

function WritableStreamDefaultControllerClose(controller) {
EnqueueValueWithSize(controller._queue, 'close', 0);
EnqueueValueWithSize(controller, 'close', 0);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

function WritableStreamDefaultControllerGetDesiredSize(controller) {
const queueSize = GetTotalQueueSize(controller._queue);
return controller._strategyHWM - queueSize;
return controller._strategyHWM - controller._queueTotalSize;
}

function WritableStreamDefaultControllerWrite(controller, chunk) {
Expand All @@ -614,7 +615,7 @@ function WritableStreamDefaultControllerWrite(controller, chunk) {
const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);

try {
EnqueueValueWithSize(controller._queue, writeRecord, chunkSize);
EnqueueValueWithSize(controller, writeRecord, chunkSize);
} catch (enqueueE) {
WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
return;
Expand Down Expand Up @@ -662,7 +663,7 @@ function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
return;
}

const writeRecord = PeekQueueValue(controller._queue);
const writeRecord = PeekQueueValue(controller);
if (writeRecord === 'close') {
WritableStreamDefaultControllerProcessClose(controller);
} else {
Expand All @@ -682,7 +683,7 @@ function WritableStreamDefaultControllerProcessClose(controller) {

assert(stream._state === 'closing', 'can\'t process final write record unless already closed');

DequeueValue(controller._queue);
DequeueValue(controller);
assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued');

controller._inClose = true;
Expand Down Expand Up @@ -743,7 +744,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
return;
}
const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);
DequeueValue(controller._queue);
DequeueValue(controller);
if (state !== 'closing') {
const backpressure = WritableStreamDefaultControllerGetBackpressure(controller);
if (lastBackpressure !== backpressure) {
Expand Down Expand Up @@ -787,8 +788,7 @@ function WritableStreamDefaultControllerError(controller, e) {
assert(stream._state === 'writable' || stream._state === 'closing');

WritableStreamError(stream, e);

controller._queue = [];
ResetQueue(controller);
}

// Helper functions for the WritableStream.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<!DOCTYPE html>
<meta charset="utf-8">
<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>
<script src="/service-workers/service-worker/resources/test-helpers.sub.js"></script>
<script src="../resources/recording-streams.js"></script>
<script src="../resources/test-initializer.js"></script>

<script src="../resources/test-utils.js"></script>
<script src="floating-point-total-queue-size.js"></script>
<script>
'use strict';
worker_test('floating-point-total-queue-size.js');
</script>
Loading

0 comments on commit b8c3455

Please sign in to comment.