Skip to content

Commit

Permalink
Non-working attempt to implement abort via queue
Browse files Browse the repository at this point in the history
In whatwg#634 (comment),
@domenic proposed an alternative way of postponing abort() until after
pending sink writes or closes are complete. The abort signal would be
added to the queue in place of its current contents, and naturally
processed after the current operation completed.

This change attempts to implement that.

Unfortunately, it is not working.
  • Loading branch information
ricea committed Feb 15, 2017
1 parent af58856 commit e293881
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 24 deletions.
9 changes: 9 additions & 0 deletions reference-implementation/lib/queue-with-sizes.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ exports.EnqueueValueWithSize = (container, value, size) => {
container._queueTotalSize += size;
};

exports.PeekQueueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: PeekQueueValue should only be used on containers with [[queue]] and [[queueTotalSize]].');
assert(container._queue.length > 0, 'Spec-level failure: should never peek at an empty queue.');

const pair = container._queue[0];
return pair.value;
};

exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container,
'Spec-level failure: ResetQueue should only be used on containers with [[queue]] and [[queueTotalSize]].');
Expand Down
59 changes: 35 additions & 24 deletions reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const assert = require('assert');
const { InvokeOrNoop, PromiseInvokeOrNoop, ValidateAndNormalizeQueuingStrategy, typeIsObject } =
require('./helpers.js');
const { rethrowAssertionErrorRejection } = require('./utils.js');
const { DequeueValue, EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { DequeueValue, EnqueueValueWithSize, PeekQueueValue, ResetQueue } = require('./queue-with-sizes.js');

class WritableStream {
constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
Expand Down Expand Up @@ -127,29 +127,29 @@ function WritableStreamAbort(stream, reason) {
const controller = stream._writableStreamController;
assert(controller !== undefined, 'controller must not be undefined');

if (stream._writer !== undefined) {
let readyPromiseIsPending = false;
if (state === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) {
readyPromiseIsPending = true;
}
WritableStreamDefaultWriterEnsureReadyPromiseRejectedWith(stream._writer, error, readyPromiseIsPending);
}

// Warning: This will make WritableStreamDefaultControllerGetBackpressure() return false from now on.
ResetQueue(controller);
EnqueueValueWithSize(controller, {
action: 'abort',
value: reason
}, 0);

let readyPromiseIsPending = false;
if (state === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) {
readyPromiseIsPending = true;
}

const promise = new Promise((resolve, reject) => {
stream._pendingAbortRequest = {
_resolve: resolve,
_reject: reject
};
});

if (stream._writer !== undefined) {
WritableStreamDefaultWriterEnsureReadyPromiseRejectedWith(stream._writer, error, readyPromiseIsPending);
}

WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);

return promise;
Expand Down Expand Up @@ -252,6 +252,10 @@ function WritableStreamFinishPendingClose(stream) {
stream._storedError = new TypeError('Abort requested but closed successfully');

WritableStreamRejectClosedPromiseIfAny(stream);

if (wasAborted) {
WritableStreamDefaultControllerProcessAbort(stream_.writeableStreamController, stream._storedError);
}
}

function WritableStreamFinishPendingCloseWithError(stream, reason) {
Expand All @@ -266,9 +270,10 @@ function WritableStreamFinishPendingCloseWithError(stream, reason) {
wasAborted = true;
}

const controller = stream._writableStreamController;
let readyPromiseIsPending = false;
if (state === 'writable' && wasAborted === false &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) {
WritableStreamDefaultControllerGetBackpressure(controller) === true) {
readyPromiseIsPending = true;
}

Expand All @@ -288,6 +293,10 @@ function WritableStreamFinishPendingCloseWithError(stream, reason) {
}

WritableStreamRejectClosedPromiseIfAny(stream);

if (wasAborted) {
WritableStreamDefaultControllerProcessAbort(controller, stream._storedError);
}
}

function WritableStreamMarkFirstWriteRequestPending(stream) {
Expand Down Expand Up @@ -782,16 +791,20 @@ function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
return;
}

const queueActionRecord = DequeueValue(controller);
const queueActionRecord = PeekQueueValue(controller);
const action = queueActionRecord.action;
const value = queueActionRecord.value;
if (action === 'write') {
// For correct handling of backpressure, the chunk must not be dequeued until the sink write() completes.
WritableStreamDefaultControllerProcessWrite(controller, value);
return;
}
DequeueValue(controller);
if (action === 'close') {
WritableStreamDefaultControllerProcessClose(controller);
} else if (action === 'abort') {
WritableStreamDefaultControllerProcessAbort(controller, value);
} else {
assert(action === 'write');
WritableStreamDefaultControllerProcessWrite(controller, value);
assert(action === 'abort');
WritableStreamDefaultControllerProcessAbort(controller, value);
}
}

Expand Down Expand Up @@ -891,6 +904,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
assert(state === 'closing' || state === 'writable');

const oldBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);
DequeueValue(controller);
WritableStreamDefaultControllerUpdateBackpressureIfNeeded(controller, oldBackpressure);

WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
Expand All @@ -904,6 +918,10 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
WritableStreamFinishPendingWriteWithError(stream, reason);

assert(stream._state === 'errored');
if (stream._pendingAbortRequest) {
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
return;
}
if (wasErrored === false) {
controller._queue = [];
}
Expand Down Expand Up @@ -1007,22 +1025,19 @@ function defaultWriterClosedPromiseResolve(writer) {
}

function defaultWriterReadyPromiseInitialize(writer) {
console.trace('defaultWriterReadyPromiseInitialize');
writer._readyPromise = new Promise((resolve, reject) => {
writer._readyPromise_resolve = resolve;
writer._readyPromise_reject = reject;
});
}

function defaultWriterReadyPromiseInitializeAsResolved(writer) {
console.trace('defaultWriterReadyPromiseInitializeAsResolved');
writer._readyPromise = Promise.resolve(undefined);
writer._readyPromise_resolve = undefined;
writer._readyPromise_reject = undefined;
}

function defaultWriterReadyPromiseReject(writer, reason) {
console.trace('defaultWriterReadyPromiseReject');
assert(writer._readyPromise_resolve !== undefined);
assert(writer._readyPromise_reject !== undefined);

Expand All @@ -1032,8 +1047,6 @@ function defaultWriterReadyPromiseReject(writer, reason) {
}

function defaultWriterReadyPromiseReset(writer) {
console.trace('defaultWriterReadyPromiseReset');

assert(writer._readyPromise_resolve === undefined);
assert(writer._readyPromise_reject === undefined);

Expand All @@ -1044,15 +1057,13 @@ function defaultWriterReadyPromiseReset(writer) {
}

function defaultWriterReadyPromiseResetToRejected(writer, reason) {
console.trace('defaultWriterReadyPromiseResetToRejected');
assert(writer._readyPromise_resolve === undefined);
assert(writer._readyPromise_reject === undefined);

writer._readyPromise = Promise.reject(reason);
}

function defaultWriterReadyPromiseResolve(writer) {
console.trace('defaultWriterReadyPromiseResolve');
assert(writer._readyPromise_resolve !== undefined);
assert(writer._readyPromise_reject !== undefined);

Expand Down

0 comments on commit e293881

Please sign in to comment.