From 591a6eda9ea0b84881a370519d6aca53fab9e256 Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Sun, 4 Dec 2016 22:54:18 -0500 Subject: [PATCH] Make write() and close() non-interruptible (#619) The underlying abort() method will now not be called until any pending underlying write() has finished. If an underlying close() is in progress then abort() will no longer be called at all. Other behavioural changes: * If the underlying operation has started, then the writer's write() and close() methods will always reflect the result of the underlying operation, rather than being rejected because abort() was called. * Consistently with the above, if an underlying operation calls controller.error() then the writer method will still reflect the result from the underlying operation. * If a call to writer.abort() has to wait for an underlying write() or close() to complete, and that underlying operation rejects, then the underlying abort() will not be called, and writer.abort() will return the rejection from the failed operation. --- index.bs | 133 ++++++++-- .../lib/transform-stream.js | 5 +- .../lib/writable-stream.js | 156 ++++++++++-- .../piping/error-propagation-backward.js | 5 +- .../writable-streams/aborting.js | 232 +++++++++++++++++- .../writable-streams/close.js | 5 +- .../writable-streams/constructor.js | 4 +- 7 files changed, 472 insertions(+), 68 deletions(-) diff --git a/index.bs b/index.bs index c27dbe2c4..d3a1915c3 100644 --- a/index.bs +++ b/index.bs @@ -2613,6 +2613,18 @@ Instances of {{WritableStream}} are created with the internal slots described in \[[writeRequests]] A List of promises representing the stream's internal queue of pending writes + + \[[pendingWriteRequest]] + The promise for the current pending write operation + + + \[[pendingCloseRequest]] + The promise returned from the writer {{WritableStreamDefaultWriter/close()}} method + + + \[[pendingAbortRequest]] + The promise for a pending abort operation +

new @@ -2758,6 +2770,14 @@ writable stream is locked to a writer. 1. Assert: _state_ is `"writable"` or `"closing"`. 1. Let _error_ be a new *TypeError* indicating that the stream has been aborted. 1. Perform ! WritableStreamError(_stream_, _error_). + 1. Let _controller_ be _stream_.[[writableStreamController]]. + 1. Assert: _controller_ is not *undefined*. + 1. If _controller_.[[writing]] is *true* or _controller_.[[inClose]] is *true*, + 1. Set _stream_.[[pendingAbortRequest]] to a new promise. + 1. If _controller_.[[writing]] is *true*, return the result of transforming _stream_.[[pendingAbortRequest]] by a + fulfillment handler that returns ! + WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_). + 1. Otherwise, return _stream_.[[pendingAbortRequest]]. 1. Return ! WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_). @@ -2788,32 +2808,58 @@ visible through the {{WritableStream}}'s public API. )

- 1. Let _state_ be _stream_.[[state]]. - 1. Assert: _state_ is `"writable"` or `"closing"`. - 1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]], - 1. Reject _writeRequest_ with _e_. - 1. Set _stream_.[[writeRequests]] to an empty List. + 1. Let _oldState_ be _stream_.[[state]]. + 1. Assert: _oldState_ is `"writable"` or `"closing"`. + 1. Set _stream_.[[state]] to `"errored"`. + 1. Set _stream_.[[storedError]] to _e_. + 1. Let _controller_ be _stream_.[[writableStreamController]]. + 1. If _controller_ is *undefined*, or both _controller_.[[writing]] and _controller_.[[inClose]] are *false*, + perform ! WritableStreamRejectPromisesInReactionToError(_stream_). 1. Let _writer_ be _stream_.[[writer]]. 1. If _writer_ is not undefined, - 1. Reject _writer_.[[closedPromise]] with _e_. - 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*. - 1. If _state_ is `"writable"` and ! + 1. If _oldState_ is `"writable"` and ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, reject _writer_.[[readyPromise]] with _e_. 1. Otherwise, set _writer_.[[readyPromise]] to a promise rejected with _e_. 1. Set _writer_.[[readyPromise]].[[PromiseIsHandled]] to *true*. - 1. Set _stream_.[[state]] to `"errored"`. - 1. Set _stream_.[[storedError]] to _e_.

WritableStreamFinishClose ( stream )

- 1. Assert: _stream_.[[state]] is `"closing"`. + 1. Assert: _stream_.[[state]] is `"closing"` or `"errored"`. 1. Assert: _stream_.[[writer]] is not *undefined*. - 1. Set _stream_.[[state]] to `"closed"`. - 1. Resolve _stream_.[[writer]].[[closedPromise]] with *undefined*. + 1. If _stream_.[[state]] is `"closing"`, + 1. Resolve _writer_.[[closedPromise]] with *undefined*. + 1. Set _stream_.[[state]] to `"closed"`. + 1. Otherwise, + 1. Assert: _stream_.[[state]] is `"errored"`. + 1. Reject _writer_.[[closedPromise]] with _stream_.[[storedError]]. + 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*. + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Resolve _stream_.[[pendingAbortRequest]] with *undefined*. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. + + +

WritableStreamRejectPromisesInReactionToError ( stream )

+ + + 1. Assert: _stream_.[[state]] is `"errored"`. + 1. Assert: _stream_.[[pendingWriteRequest]] is *undefined*. + 1. Let _storedError_ be _stream_.[[storedError]]. + 1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]], + 1. Reject _writeRequest_ with _storedError_. + 1. Set _stream_.[[writeRequests]] to an empty List. + 1. If _stream_.[[pendingCloseRequest]] is not *undefined*, + 1. Assert: _stream_.[[writableStreamController]].[[inClose]] is *false*. + 1. Reject _stream_.[[pendingCloseRequest]] with _storedError_. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. + 1. Let _writer_ be _stream_.[[writer]]. + 1. If _writer_ is not undefined, + 1. Reject _writer_.[[closedPromise]] with _storedError_. + 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.

WritableStreamDefaultWriterClose ( writer )

1. Let _state_ be _stream_.[[state]]. 1. If _state_ is `"closed"` or `"errored"`, return a promise rejected with a *TypeError* exception. 1. Assert: _state_ is `"writable"`. - 1. Let _promise_ be ! WritableStreamAddWriteRequest(_stream_). + 1. Set _stream_.[[pendingCloseRequest]] to a new promise. 1. If ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, resolve _writer_.[[readyPromise]] with *undefined*. 1. Set _stream_.[[state]] to `"closing"`. 1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]). - 1. Return _promise_. + 1. Return _stream_.[[pendingCloseRequest]].

A boolean flag set to true while the underlying sink's write method is executing and has not yet fulfilled, used to prevent reentrant calls + + \[[inClose]] + A boolean flag set to true while the underlying sink's close method is + executing and has not yet fulfilled, used to prevent the {{WritableStreamDefaultWriter/abort()}} method from + interrupting close

stream, underlyingSink, WritableStreamDefaultControllerProcessClose ( controller )Upon fulfillment of _sinkClosePromise_, - 1. If _stream_.[[state]] is not `"closing"`, return. - 1. Perform ! WritableStreamFulfillWriteRequest(_stream_). + 1. Assert: _controller_.[[inClose]] is *true*. + 1. Set _controller_.[[inClose]] to *false*. + 1. If _stream_.[[state]] is not `"closing"` or `"errored"`, return. + 1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*. + 1. Resolve _stream_.[[pendingCloseRequest]] with *undefined*. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. 1. Perform ! WritableStreamFinishClose(_stream_). 1. Upon rejection of _sinkClosePromise_ with reason _r_, + 1. Assert: _controller_.[[inClose]] it *true*. + 1. Set _controller_.[[InClose]] to *false*. + 1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*. + 1. Reject _stream_.[[pendingCloseRequest]] with _r_. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Reject _stream_.[[pendingAbortRequest]] with _r_. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_). @@ -3373,14 +3437,28 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, 1. Set _controller_.[[writing]] to *true*. + 1. Let _stream_ be _controller_.[[controllerWritableStream]]. + 1. Assert: _stream_.[[pendingWriteRequest]] is undefined. + 1. Assert: _stream_.[[writeRequests]] is not empty. + 1. Let _writeRequest_ be the first element of _stream_.[[writeRequests]]. + 1. Remove _writeRequest_ from _stream_.[[writeRequests]], shifting all other elements downward (so that the second + becomes the first, and so on). + 1. Set _stream_.[[pendingWriteRequest]] to _writeRequest_. 1. Let _sinkWritePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"write"`, « ‍_chunk_, _controller_ »). 1. Upon fulfillment of _sinkWritePromise_, - 1. Let _stream_ be _controller_.[[controlledWritableStream]]. 1. Let _state_ be _stream_.[[state]]. - 1. If _state_ is `"errored"` or `"closed"`, return. + 1. Assert: _controller_.[[writing]] is *true*. 1. Set _controller_.[[writing]] to *false*. - 1. Perform ! WritableStreamFulfillWriteRequest(_stream_). + 1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*. + 1. Resolve _stream_.[[pendingWriteRequest]] with *undefined*. + 1. Set _stream_.[[pendingWriteRequest]] to *undefined*. + 1. If _state_ is `"errored"`, + 1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_). + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Resolve _stream_.[[pendingAbortRequest]] with *undefined*. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. + 1. Return. 1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_). 1. Perform ! DequeueValue(_controller_.[[queue]]). 1. If _state_ is not `"closing"`, @@ -3388,7 +3466,18 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, Upon rejection of _sinkWritePromise_ with reason _r_, + 1. Upon rejection of _sinkWritePromise_ with _r_, + 1. Assert: _controller_.[[writing]] is *true*. + 1. Set _controller_.[[writing]] to *false*. + 1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*. + 1. Reject _stream_.[[pendingWriteRequest]] with _r_. + 1. Set _stream_.[[pendingWriteRequest]] to *undefined*. + 1. If _stream_.[[state]] is `"errored"`, + 1. Set _stream_.[[storedError]] to _r_. + 1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_). + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Reject _stream_.[[pendingAbortRequest]] with _r_. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_). diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index cf0ab5154..148288d85 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -173,7 +173,10 @@ function TransformStreamTransform(transformStream, chunk) { return TransformStreamReadableReadyPromise(transformStream); }, - e => TransformStreamErrorIfNeeded(transformStream, e)); + e => { + TransformStreamErrorIfNeeded(transformStream, e); + return Promise.reject(e); + }); } function IsTransformStreamDefaultController(x) { diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index 29f69d89b..47a4a27ee 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -20,6 +20,17 @@ class WritableStream { // producer without waiting for the queued writes to finish. this._writeRequests = []; + // Write requests are removed from _writeRequests when write() is called on the underlying sink. This prevents + // them from being erroneously rejected on error. If a write() call is pending, the request is stored here. + this._pendingWriteRequest = undefined; + + // The promise that was returned from writer.close(). Stored here because it may be fulfilled after the writer + // has been detached. + this._pendingCloseRequest = undefined; + + // The promise that was returned from writer.abort(). This may also be fulfilled after the writer has detached. + this._pendingAbortRequest = undefined; + const type = underlyingSink.type; if (type !== undefined) { @@ -113,6 +124,23 @@ function WritableStreamAbort(stream, reason) { WritableStreamError(stream, error); + const controller = stream._writableStreamController; + assert(controller !== undefined); + if (controller._writing === true || controller._inClose === true) { + const promise = new Promise((resolve, reject) => { + const abortRequest = { + _resolve: resolve, + _reject: reject + }; + + stream._pendingAbortRequest = abortRequest; + }); + if (controller._writing === true) { + return promise.then(() => WritableStreamDefaultControllerAbort(stream._writableStreamController, reason)); + } + return promise; + } + return WritableStreamDefaultControllerAbort(stream._writableStreamController, reason); } @@ -135,20 +163,21 @@ function WritableStreamAddWriteRequest(stream) { } function WritableStreamError(stream, e) { - const state = stream._state; - assert(state === 'writable' || state === 'closing'); + const oldState = stream._state; + assert(oldState === 'writable' || oldState === 'closing'); + stream._state = 'errored'; + stream._storedError = e; - for (const writeRequest of stream._writeRequests) { - writeRequest._reject(e); + const controller = stream._writableStreamController; + // This method can be called during the construction of the controller, in which case "controller" will be undefined + // but the flags are guaranteed to be false anyway. + if (controller === undefined || controller._writing === false && controller._inClose === false) { + WritableStreamRejectPromisesInReactionToError(stream); } - stream._writeRequests = []; const writer = stream._writer; if (writer !== undefined) { - defaultWriterClosedPromiseReject(writer, e); - writer._closedPromise.catch(() => {}); - - if (state === 'writable' && + if (oldState === 'writable' && WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) { defaultWriterReadyPromiseReject(writer, e); } else { @@ -156,24 +185,47 @@ function WritableStreamError(stream, e) { } writer._readyPromise.catch(() => {}); } - - stream._state = 'errored'; - stream._storedError = e; } function WritableStreamFinishClose(stream) { - assert(stream._state === 'closing'); + assert(stream._state === 'closing' || stream._state === 'errored'); - stream._state = 'closed'; + if (stream._state === 'closing') { + defaultWriterClosedPromiseResolve(stream._writer); + stream._state = 'closed'; + } else { + assert(stream._state === 'errored'); + defaultWriterClosedPromiseReject(stream._writer, stream._storedError); + stream._writer._closedPromise.catch(() => {}); + } - defaultWriterClosedPromiseResolve(stream._writer); + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._resolve(); + stream._pendingAbortRequest = undefined; + } } -function WritableStreamFulfillWriteRequest(stream) { - assert(stream._writeRequests.length > 0); +function WritableStreamRejectPromisesInReactionToError(stream) { + assert(stream._state === 'errored'); + assert(stream._pendingWriteRequest === undefined); + + const storedError = stream._storedError; + for (const writeRequest of stream._writeRequests) { + writeRequest._reject(storedError); + } + stream._writeRequests = []; - const writeRequest = stream._writeRequests.shift(); - writeRequest._resolve(undefined); + if (stream._pendingCloseRequest !== undefined) { + assert(stream._writableStreamController._inClose === false); + stream._pendingCloseRequest._reject(storedError); + stream._pendingCloseRequest = undefined; + } + + const writer = stream._writer; + if (writer !== undefined) { + defaultWriterClosedPromiseReject(writer, storedError); + writer._closedPromise.catch(() => {}); + } } function WritableStreamUpdateBackpressure(stream, backpressure) { @@ -355,7 +407,14 @@ function WritableStreamDefaultWriterClose(writer) { assert(state === 'writable'); - const promise = WritableStreamAddWriteRequest(stream); + const promise = new Promise((resolve, reject) => { + const closeRequest = { + _resolve: resolve, + _reject: reject + }; + + stream._pendingCloseRequest = closeRequest; + }); if (WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) { defaultWriterReadyPromiseResolve(writer); @@ -469,6 +528,7 @@ class WritableStreamDefaultController { this._queue = []; this._started = false; this._writing = false; + this._inClose = false; const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); this._strategySize = normalizedStrategy.size; @@ -622,17 +682,32 @@ function WritableStreamDefaultControllerProcessClose(controller) { DequeueValue(controller._queue); assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued'); + controller._inClose = true; const sinkClosePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'close', [controller]); sinkClosePromise.then( () => { - if (stream._state !== 'closing') { + assert(controller._inClose === true); + controller._inClose = false; + if (stream._state !== 'closing' && stream._state !== 'errored') { return; } - WritableStreamFulfillWriteRequest(stream); + assert(stream._pendingCloseRequest !== undefined); + stream._pendingCloseRequest._resolve(undefined); + stream._pendingCloseRequest = undefined; + WritableStreamFinishClose(stream); }, r => { + assert(controller._inClose === true); + controller._inClose = false; + assert(stream._pendingCloseRequest !== undefined); + stream._pendingCloseRequest._reject(r); + stream._pendingCloseRequest = undefined; + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._reject(r); + stream._pendingAbortRequest = undefined; + } WritableStreamDefaultControllerErrorIfNeeded(controller, r); } ) @@ -642,19 +717,32 @@ function WritableStreamDefaultControllerProcessClose(controller) { function WritableStreamDefaultControllerProcessWrite(controller, chunk) { controller._writing = true; + const stream = controller._controlledWritableStream; + + assert(stream._pendingWriteRequest === undefined); + assert(stream._writeRequests.length !== 0); + stream._pendingWriteRequest = stream._writeRequests.shift(); const sinkWritePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'write', [chunk, controller]); sinkWritePromise.then( () => { - const stream = controller._controlledWritableStream; const state = stream._state; - if (state === 'errored' || state === 'closed') { - return; - } + assert(controller._writing === true); controller._writing = false; - WritableStreamFulfillWriteRequest(stream); + assert(stream._pendingWriteRequest !== undefined); + stream._pendingWriteRequest._resolve(undefined); + stream._pendingWriteRequest = undefined; + + if (state === 'errored') { + WritableStreamRejectPromisesInReactionToError(stream); + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._resolve(); + stream._pendingAbortRequest = undefined; + } + return; + } const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); DequeueValue(controller._queue); if (state !== 'closing') { @@ -667,6 +755,20 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); }, r => { + assert(controller._writing === true); + controller._writing = false; + + assert(stream._pendingWriteRequest !== undefined); + stream._pendingWriteRequest._reject(r); + stream._pendingWriteRequest = undefined; + if (stream._state === 'errored') { + stream._storedError = r; + WritableStreamRejectPromisesInReactionToError(stream); + } + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._reject(r); + stream._pendingAbortRequest = undefined; + } WritableStreamDefaultControllerErrorIfNeeded(controller, r); } ) diff --git a/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js b/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js index 7c076d536..1be82c2c0 100644 --- a/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js +++ b/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js @@ -611,8 +611,7 @@ promise_test(t => { const ws = recordingWritableStream({ write() { resolveWriteCalled(); - // never settles, which normally would hang the pipe, except we error via the controller. - return new Promise(() => {}); + return flushAsyncEvents(); } }); @@ -629,6 +628,6 @@ promise_test(t => { assert_array_equals(ws.events, ['write', 'a']); }); -}, 'Errors must be propagated backward: erroring via the controller errors a slow write'); +}, 'Errors must be propagated backward: erroring via the controller errors once pending write completes'); done(); diff --git a/reference-implementation/to-upstream-wpts/writable-streams/aborting.js b/reference-implementation/to-upstream-wpts/writable-streams/aborting.js index d98b6e4eb..cbb5228cb 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/aborting.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/aborting.js @@ -75,8 +75,6 @@ promise_test(t => { }); }, 'Aborting a WritableStream immediately prevents future writes'); -// https://github.com/whatwg/streams/issues/611 -/* promise_test(t => { const ws = recordingWritableStream(); const results = []; @@ -91,19 +89,20 @@ promise_test(t => { promise_rejects(t, new TypeError(), writer.write(3), 'write(3) must reject with a TypeError') ); - writer.abort(); + const abortPromise = writer.abort(); results.push( promise_rejects(t, new TypeError(), writer.write(4), 'write(4) must reject with a TypeError'), promise_rejects(t, new TypeError(), writer.write(5), 'write(5) must reject with a TypeError') ); + + return abortPromise; }).then(() => { assert_array_equals(ws.events, ['write', 1, 'abort', undefined]); return Promise.all(results); }); }, 'Aborting a WritableStream prevents further writes after any that are in progress'); -*/ promise_test(() => { const ws = new WritableStream({ @@ -190,9 +189,12 @@ promise_test(t => { }, 'Closing but then immediately aborting a WritableStream causes the stream to error'); promise_test(t => { + let resolveClose; const ws = new WritableStream({ close() { - return new Promise(() => { }); // forever-pending + return new Promise(resolve => { + resolveClose = resolve; + }); } }); const writer = ws.getWriter(); @@ -201,11 +203,12 @@ promise_test(t => { return delay(0).then(() => { writer.abort(error1); - }) - .then(() => Promise.all([ - promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'), - promise_rejects(t, new TypeError(), closePromise, 'close() should reject with a TypeError') - ])); + resolveClose(); + return Promise.all([ + promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'), + closePromise + ]); + }); }, 'Closing a WritableStream and aborting it while it closes causes the stream to error'); promise_test(() => { @@ -255,4 +258,213 @@ promise_test(() => { return writer.abort().then(() => assert_true(thenCalled, 'then() should be called')); }, 'returning a thenable from abort() should work'); +promise_test(t => { + const ws = new WritableStream({ + write() { + return flushAsyncEvents(); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const writePromise = writer.write('a'); + writer.abort(error1); + let closedResolved = false; + return Promise.all([ + writePromise.then(() => assert_false(closedResolved, '.closed should not resolve before write()')), + promise_rejects(t, new TypeError(), writer.closed, '.closed should reject').then(() => { + closedResolved = true; + }) + ]); + }); +}, '.closed should not resolve before fulfilled write()'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const writePromise = writer.write('a'); + const abortPromise = writer.abort(error1); + let closedResolved = false; + return Promise.all([ + promise_rejects(t, error1, writePromise, 'write() should reject') + .then(() => assert_false(closedResolved, '.closed should not resolve before write()')), + promise_rejects(t, error1, writer.closed, '.closed should reject') + .then(() => { + closedResolved = true; + }), + promise_rejects(t, error1, abortPromise, 'abort() should reject')]); + }); +}, '.closed should not resolve before rejected write()'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return flushAsyncEvents(); + } + }, new CountQueuingStrategy(4)); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const settlementOrder = []; + return Promise.all([ + writer.write('1').then(() => settlementOrder.push(1)), + promise_rejects(t, new TypeError(), writer.write('2'), 'first queued write should be rejected') + .then(() => settlementOrder.push(2)), + promise_rejects(t, new TypeError(), writer.write('3'), 'second queued write should be rejected') + .then(() => settlementOrder.push(3)), + writer.abort(error1) + ]).then(() => assert_array_equals([1, 2, 3], settlementOrder, 'writes should be satisfied in order')); + }); +}, 'writes should be satisfied in order when aborting'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }, new CountQueuingStrategy(4)); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const settlementOrder = []; + return Promise.all([ + promise_rejects(t, error1, writer.write('1'), 'pending write should be rejected') + .then(() => settlementOrder.push(1)), + promise_rejects(t, error1, writer.write('2'), 'first queued write should be rejected') + .then(() => settlementOrder.push(2)), + promise_rejects(t, error1, writer.write('3'), 'second queued write should be rejected') + .then(() => settlementOrder.push(3)), + promise_rejects(t, error1, writer.abort(error1), 'abort should be rejected') + ]).then(() => assert_array_equals([1, 2, 3], settlementOrder, 'writes should be satisfied in order')); + }); +}, 'writes should be satisfied in order after rejected write when aborting'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + return Promise.all([ + promise_rejects(t, error1, writer.write('a'), 'writer.write() should reject with error from underlying write()'), + promise_rejects(t, error1, writer.close(), 'writer.close() should reject with error from underlying write()'), + promise_rejects(t, error1, writer.abort(), 'writer.abort() should reject with error from underlying write()') + ]); + }); +}, 'close() should use error from underlying write() on abort'); + +promise_test(() => { + let resolveWrite; + let abort_called = false; + const ws = new WritableStream({ + write() { + return new Promise(resolve => { + resolveWrite = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while write is pending'); + resolveWrite(); + return abortPromise.then(() => assert_true(abort_called, 'abort should be called')); + }); + }); +}, 'underlying abort() should not be called until underlying write() completes'); + +promise_test(() => { + let resolveClose; + let abort_called = false; + const ws = new WritableStream({ + close() { + return new Promise(resolve => { + resolveClose = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.close(); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while close is pending'); + resolveClose(); + return abortPromise.then(() => assert_false(abort_called, 'abort should not be called after close completes')); + }); + }); +}, 'underlying abort() should not be called if underlying close() has started'); + +promise_test(t => { + let resolveWrite; + let abort_called = false; + const ws = new WritableStream({ + write() { + return new Promise(resolve => { + resolveWrite = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + const closePromise = writer.close(); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while write is pending'); + resolveWrite(); + return abortPromise.then(() => { + assert_true(abort_called, 'abort should be called after write completes'); + return promise_rejects(t, new TypeError(), closePromise, 'promise returned by close() should be rejected'); + }); + }); + }); +}, 'underlying abort() should be called while closing if underlying close() has not started yet'); + +promise_test(() => { + const ws = new WritableStream(); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const closePromise = writer.close(); + const abortPromise = writer.abort(); + let closeResolved = false; + Promise.all([ + closePromise.then(() => { closeResolved = true; }), + abortPromise.then(() => { assert_true(closeResolved, 'close() promise should resolve before abort() promise'); }) + ]); + }); +}, 'writer close() promise should resolve before abort() promise'); + +promise_test(t => { + const ws = new WritableStream({ + write(chunk, controller) { + controller.error(error1); + return new Promise(() => {}); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + return promise_rejects(t, error1, writer.ready, 'writer.ready should reject'); + }); +}, 'writer.ready should reject on controller error without waiting for underlying write'); + done(); diff --git a/reference-implementation/to-upstream-wpts/writable-streams/close.js b/reference-implementation/to-upstream-wpts/writable-streams/close.js index cbcad8dc2..fb1552379 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/close.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/close.js @@ -33,7 +33,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, passedError, writer.close(), 'close() should be rejected with the passed error'), + writer.close(), delay(10).then(() => controller.error(passedError)), promise_rejects(t, passedError, writer.closed, 'closed promise should be rejected with the passed error'), @@ -51,8 +51,7 @@ promise_test(t => { const writer = ws.getWriter(); - return promise_rejects(t, passedError, writer.close(), 'close promise should be rejected with the passed error') - .then(() => promise_rejects(t, passedError, writer.closed, 'closed should stay rejected')); + return writer.close().then(() => promise_rejects(t, passedError, writer.closed, 'closed should stay rejected')); }, 'when sink calls error synchronously while closing, the stream should become errored'); promise_test(() => { diff --git a/reference-implementation/to-upstream-wpts/writable-streams/constructor.js b/reference-implementation/to-upstream-wpts/writable-streams/constructor.js index 30c7c70be..dc2bef9a1 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/constructor.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/constructor.js @@ -36,7 +36,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, error1, writer.write('a'), 'write() should reject with the error'), + writer.write('a'), promise_rejects(t, error1, writer.closed, 'controller.error() in write() should errored the stream') ]); }, 'controller argument should be passed to write method'); @@ -51,7 +51,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, error1, writer.close(), 'close() should reject with the error'), + writer.close(), promise_rejects(t, error1, writer.closed, 'controller.error() in close() should error the stream') ]); }, 'controller argument should be passed to close method');