diff --git a/index.bs b/index.bs index c27dbe2c4..d3a1915c3 100644 --- a/index.bs +++ b/index.bs @@ -2613,6 +2613,18 @@ Instances of {{WritableStream}} are created with the internal slots described in \[[writeRequests]] A List of promises representing the stream's internal queue of pending writes + + \[[pendingWriteRequest]] + The promise for the current pending write operation + + + \[[pendingCloseRequest]] + The promise returned from the writer {{WritableStreamDefaultWriter/close()}} method + + + \[[pendingAbortRequest]] + The promise for a pending abort operation +

new @@ -2758,6 +2770,14 @@ writable stream is locked to a writer. 1. Assert: _state_ is `"writable"` or `"closing"`. 1. Let _error_ be a new *TypeError* indicating that the stream has been aborted. 1. Perform ! WritableStreamError(_stream_, _error_). + 1. Let _controller_ be _stream_.[[writableStreamController]]. + 1. Assert: _controller_ is not *undefined*. + 1. If _controller_.[[writing]] is *true* or _controller_.[[inClose]] is *true*, + 1. Set _stream_.[[pendingAbortRequest]] to a new promise. + 1. If _controller_.[[writing]] is *true*, return the result of transforming _stream_.[[pendingAbortRequest]] by a + fulfillment handler that returns ! + WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_). + 1. Otherwise, return _stream_.[[pendingAbortRequest]]. 1. Return ! WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_). @@ -2788,32 +2808,58 @@ visible through the {{WritableStream}}'s public API. )

- 1. Let _state_ be _stream_.[[state]]. - 1. Assert: _state_ is `"writable"` or `"closing"`. - 1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]], - 1. Reject _writeRequest_ with _e_. - 1. Set _stream_.[[writeRequests]] to an empty List. + 1. Let _oldState_ be _stream_.[[state]]. + 1. Assert: _oldState_ is `"writable"` or `"closing"`. + 1. Set _stream_.[[state]] to `"errored"`. + 1. Set _stream_.[[storedError]] to _e_. + 1. Let _controller_ be _stream_.[[writableStreamController]]. + 1. If _controller_ is *undefined*, or both _controller_.[[writing]] and _controller_.[[inClose]] are *false*, + perform ! WritableStreamRejectPromisesInReactionToError(_stream_). 1. Let _writer_ be _stream_.[[writer]]. 1. If _writer_ is not undefined, - 1. Reject _writer_.[[closedPromise]] with _e_. - 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*. - 1. If _state_ is `"writable"` and ! + 1. If _oldState_ is `"writable"` and ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, reject _writer_.[[readyPromise]] with _e_. 1. Otherwise, set _writer_.[[readyPromise]] to a promise rejected with _e_. 1. Set _writer_.[[readyPromise]].[[PromiseIsHandled]] to *true*. - 1. Set _stream_.[[state]] to `"errored"`. - 1. Set _stream_.[[storedError]] to _e_.

WritableStreamFinishClose ( stream )

- 1. Assert: _stream_.[[state]] is `"closing"`. + 1. Assert: _stream_.[[state]] is `"closing"` or `"errored"`. 1. Assert: _stream_.[[writer]] is not *undefined*. - 1. Set _stream_.[[state]] to `"closed"`. - 1. Resolve _stream_.[[writer]].[[closedPromise]] with *undefined*. + 1. If _stream_.[[state]] is `"closing"`, + 1. Resolve _writer_.[[closedPromise]] with *undefined*. + 1. Set _stream_.[[state]] to `"closed"`. + 1. Otherwise, + 1. Assert: _stream_.[[state]] is `"errored"`. + 1. Reject _writer_.[[closedPromise]] with _stream_.[[storedError]]. + 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*. + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Resolve _stream_.[[pendingAbortRequest]] with *undefined*. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. + + +

WritableStreamRejectPromisesInReactionToError ( stream )

+ + + 1. Assert: _stream_.[[state]] is `"errored"`. + 1. Assert: _stream_.[[pendingWriteRequest]] is *undefined*. + 1. Let _storedError_ be _stream_.[[storedError]]. + 1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]], + 1. Reject _writeRequest_ with _storedError_. + 1. Set _stream_.[[writeRequests]] to an empty List. + 1. If _stream_.[[pendingCloseRequest]] is not *undefined*, + 1. Assert: _stream_.[[writableStreamController]].[[inClose]] is *false*. + 1. Reject _stream_.[[pendingCloseRequest]] with _storedError_. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. + 1. Let _writer_ be _stream_.[[writer]]. + 1. If _writer_ is not undefined, + 1. Reject _writer_.[[closedPromise]] with _storedError_. + 1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.

WritableStreamDefaultWriterClose ( writer )

1. Let _state_ be _stream_.[[state]]. 1. If _state_ is `"closed"` or `"errored"`, return a promise rejected with a *TypeError* exception. 1. Assert: _state_ is `"writable"`. - 1. Let _promise_ be ! WritableStreamAddWriteRequest(_stream_). + 1. Set _stream_.[[pendingCloseRequest]] to a new promise. 1. If ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, resolve _writer_.[[readyPromise]] with *undefined*. 1. Set _stream_.[[state]] to `"closing"`. 1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]). - 1. Return _promise_. + 1. Return _stream_.[[pendingCloseRequest]].

A boolean flag set to true while the underlying sink's write method is executing and has not yet fulfilled, used to prevent reentrant calls + + \[[inClose]] + A boolean flag set to true while the underlying sink's close method is + executing and has not yet fulfilled, used to prevent the {{WritableStreamDefaultWriter/abort()}} method from + interrupting close

stream, underlyingSink, WritableStreamDefaultControllerProcessClose ( controller )Upon fulfillment of _sinkClosePromise_, - 1. If _stream_.[[state]] is not `"closing"`, return. - 1. Perform ! WritableStreamFulfillWriteRequest(_stream_). + 1. Assert: _controller_.[[inClose]] is *true*. + 1. Set _controller_.[[inClose]] to *false*. + 1. If _stream_.[[state]] is not `"closing"` or `"errored"`, return. + 1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*. + 1. Resolve _stream_.[[pendingCloseRequest]] with *undefined*. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. 1. Perform ! WritableStreamFinishClose(_stream_). 1. Upon rejection of _sinkClosePromise_ with reason _r_, + 1. Assert: _controller_.[[inClose]] it *true*. + 1. Set _controller_.[[InClose]] to *false*. + 1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*. + 1. Reject _stream_.[[pendingCloseRequest]] with _r_. + 1. Set _stream_.[[pendingCloseRequest]] to *undefined*. + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Reject _stream_.[[pendingAbortRequest]] with _r_. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_). @@ -3373,14 +3437,28 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, 1. Set _controller_.[[writing]] to *true*. + 1. Let _stream_ be _controller_.[[controllerWritableStream]]. + 1. Assert: _stream_.[[pendingWriteRequest]] is undefined. + 1. Assert: _stream_.[[writeRequests]] is not empty. + 1. Let _writeRequest_ be the first element of _stream_.[[writeRequests]]. + 1. Remove _writeRequest_ from _stream_.[[writeRequests]], shifting all other elements downward (so that the second + becomes the first, and so on). + 1. Set _stream_.[[pendingWriteRequest]] to _writeRequest_. 1. Let _sinkWritePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"write"`, « ‍_chunk_, _controller_ »). 1. Upon fulfillment of _sinkWritePromise_, - 1. Let _stream_ be _controller_.[[controlledWritableStream]]. 1. Let _state_ be _stream_.[[state]]. - 1. If _state_ is `"errored"` or `"closed"`, return. + 1. Assert: _controller_.[[writing]] is *true*. 1. Set _controller_.[[writing]] to *false*. - 1. Perform ! WritableStreamFulfillWriteRequest(_stream_). + 1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*. + 1. Resolve _stream_.[[pendingWriteRequest]] with *undefined*. + 1. Set _stream_.[[pendingWriteRequest]] to *undefined*. + 1. If _state_ is `"errored"`, + 1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_). + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Resolve _stream_.[[pendingAbortRequest]] with *undefined*. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. + 1. Return. 1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_). 1. Perform ! DequeueValue(_controller_.[[queue]]). 1. If _state_ is not `"closing"`, @@ -3388,7 +3466,18 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( controller, Upon rejection of _sinkWritePromise_ with reason _r_, + 1. Upon rejection of _sinkWritePromise_ with _r_, + 1. Assert: _controller_.[[writing]] is *true*. + 1. Set _controller_.[[writing]] to *false*. + 1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*. + 1. Reject _stream_.[[pendingWriteRequest]] with _r_. + 1. Set _stream_.[[pendingWriteRequest]] to *undefined*. + 1. If _stream_.[[state]] is `"errored"`, + 1. Set _stream_.[[storedError]] to _r_. + 1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_). + 1. If _stream_.[[pendingAbortRequest]] is not *undefined*, + 1. Reject _stream_.[[pendingAbortRequest]] with _r_. + 1. Set _stream_.[[pendingAbortRequest]] to *undefined*. 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_). diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index cf0ab5154..148288d85 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -173,7 +173,10 @@ function TransformStreamTransform(transformStream, chunk) { return TransformStreamReadableReadyPromise(transformStream); }, - e => TransformStreamErrorIfNeeded(transformStream, e)); + e => { + TransformStreamErrorIfNeeded(transformStream, e); + return Promise.reject(e); + }); } function IsTransformStreamDefaultController(x) { diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index 29f69d89b..47a4a27ee 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -20,6 +20,17 @@ class WritableStream { // producer without waiting for the queued writes to finish. this._writeRequests = []; + // Write requests are removed from _writeRequests when write() is called on the underlying sink. This prevents + // them from being erroneously rejected on error. If a write() call is pending, the request is stored here. + this._pendingWriteRequest = undefined; + + // The promise that was returned from writer.close(). Stored here because it may be fulfilled after the writer + // has been detached. + this._pendingCloseRequest = undefined; + + // The promise that was returned from writer.abort(). This may also be fulfilled after the writer has detached. + this._pendingAbortRequest = undefined; + const type = underlyingSink.type; if (type !== undefined) { @@ -113,6 +124,23 @@ function WritableStreamAbort(stream, reason) { WritableStreamError(stream, error); + const controller = stream._writableStreamController; + assert(controller !== undefined); + if (controller._writing === true || controller._inClose === true) { + const promise = new Promise((resolve, reject) => { + const abortRequest = { + _resolve: resolve, + _reject: reject + }; + + stream._pendingAbortRequest = abortRequest; + }); + if (controller._writing === true) { + return promise.then(() => WritableStreamDefaultControllerAbort(stream._writableStreamController, reason)); + } + return promise; + } + return WritableStreamDefaultControllerAbort(stream._writableStreamController, reason); } @@ -135,20 +163,21 @@ function WritableStreamAddWriteRequest(stream) { } function WritableStreamError(stream, e) { - const state = stream._state; - assert(state === 'writable' || state === 'closing'); + const oldState = stream._state; + assert(oldState === 'writable' || oldState === 'closing'); + stream._state = 'errored'; + stream._storedError = e; - for (const writeRequest of stream._writeRequests) { - writeRequest._reject(e); + const controller = stream._writableStreamController; + // This method can be called during the construction of the controller, in which case "controller" will be undefined + // but the flags are guaranteed to be false anyway. + if (controller === undefined || controller._writing === false && controller._inClose === false) { + WritableStreamRejectPromisesInReactionToError(stream); } - stream._writeRequests = []; const writer = stream._writer; if (writer !== undefined) { - defaultWriterClosedPromiseReject(writer, e); - writer._closedPromise.catch(() => {}); - - if (state === 'writable' && + if (oldState === 'writable' && WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) { defaultWriterReadyPromiseReject(writer, e); } else { @@ -156,24 +185,47 @@ function WritableStreamError(stream, e) { } writer._readyPromise.catch(() => {}); } - - stream._state = 'errored'; - stream._storedError = e; } function WritableStreamFinishClose(stream) { - assert(stream._state === 'closing'); + assert(stream._state === 'closing' || stream._state === 'errored'); - stream._state = 'closed'; + if (stream._state === 'closing') { + defaultWriterClosedPromiseResolve(stream._writer); + stream._state = 'closed'; + } else { + assert(stream._state === 'errored'); + defaultWriterClosedPromiseReject(stream._writer, stream._storedError); + stream._writer._closedPromise.catch(() => {}); + } - defaultWriterClosedPromiseResolve(stream._writer); + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._resolve(); + stream._pendingAbortRequest = undefined; + } } -function WritableStreamFulfillWriteRequest(stream) { - assert(stream._writeRequests.length > 0); +function WritableStreamRejectPromisesInReactionToError(stream) { + assert(stream._state === 'errored'); + assert(stream._pendingWriteRequest === undefined); + + const storedError = stream._storedError; + for (const writeRequest of stream._writeRequests) { + writeRequest._reject(storedError); + } + stream._writeRequests = []; - const writeRequest = stream._writeRequests.shift(); - writeRequest._resolve(undefined); + if (stream._pendingCloseRequest !== undefined) { + assert(stream._writableStreamController._inClose === false); + stream._pendingCloseRequest._reject(storedError); + stream._pendingCloseRequest = undefined; + } + + const writer = stream._writer; + if (writer !== undefined) { + defaultWriterClosedPromiseReject(writer, storedError); + writer._closedPromise.catch(() => {}); + } } function WritableStreamUpdateBackpressure(stream, backpressure) { @@ -355,7 +407,14 @@ function WritableStreamDefaultWriterClose(writer) { assert(state === 'writable'); - const promise = WritableStreamAddWriteRequest(stream); + const promise = new Promise((resolve, reject) => { + const closeRequest = { + _resolve: resolve, + _reject: reject + }; + + stream._pendingCloseRequest = closeRequest; + }); if (WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) { defaultWriterReadyPromiseResolve(writer); @@ -469,6 +528,7 @@ class WritableStreamDefaultController { this._queue = []; this._started = false; this._writing = false; + this._inClose = false; const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); this._strategySize = normalizedStrategy.size; @@ -622,17 +682,32 @@ function WritableStreamDefaultControllerProcessClose(controller) { DequeueValue(controller._queue); assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued'); + controller._inClose = true; const sinkClosePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'close', [controller]); sinkClosePromise.then( () => { - if (stream._state !== 'closing') { + assert(controller._inClose === true); + controller._inClose = false; + if (stream._state !== 'closing' && stream._state !== 'errored') { return; } - WritableStreamFulfillWriteRequest(stream); + assert(stream._pendingCloseRequest !== undefined); + stream._pendingCloseRequest._resolve(undefined); + stream._pendingCloseRequest = undefined; + WritableStreamFinishClose(stream); }, r => { + assert(controller._inClose === true); + controller._inClose = false; + assert(stream._pendingCloseRequest !== undefined); + stream._pendingCloseRequest._reject(r); + stream._pendingCloseRequest = undefined; + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._reject(r); + stream._pendingAbortRequest = undefined; + } WritableStreamDefaultControllerErrorIfNeeded(controller, r); } ) @@ -642,19 +717,32 @@ function WritableStreamDefaultControllerProcessClose(controller) { function WritableStreamDefaultControllerProcessWrite(controller, chunk) { controller._writing = true; + const stream = controller._controlledWritableStream; + + assert(stream._pendingWriteRequest === undefined); + assert(stream._writeRequests.length !== 0); + stream._pendingWriteRequest = stream._writeRequests.shift(); const sinkWritePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'write', [chunk, controller]); sinkWritePromise.then( () => { - const stream = controller._controlledWritableStream; const state = stream._state; - if (state === 'errored' || state === 'closed') { - return; - } + assert(controller._writing === true); controller._writing = false; - WritableStreamFulfillWriteRequest(stream); + assert(stream._pendingWriteRequest !== undefined); + stream._pendingWriteRequest._resolve(undefined); + stream._pendingWriteRequest = undefined; + + if (state === 'errored') { + WritableStreamRejectPromisesInReactionToError(stream); + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._resolve(); + stream._pendingAbortRequest = undefined; + } + return; + } const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller); DequeueValue(controller._queue); if (state !== 'closing') { @@ -667,6 +755,20 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); }, r => { + assert(controller._writing === true); + controller._writing = false; + + assert(stream._pendingWriteRequest !== undefined); + stream._pendingWriteRequest._reject(r); + stream._pendingWriteRequest = undefined; + if (stream._state === 'errored') { + stream._storedError = r; + WritableStreamRejectPromisesInReactionToError(stream); + } + if (stream._pendingAbortRequest !== undefined) { + stream._pendingAbortRequest._reject(r); + stream._pendingAbortRequest = undefined; + } WritableStreamDefaultControllerErrorIfNeeded(controller, r); } ) diff --git a/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js b/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js index 7c076d536..1be82c2c0 100644 --- a/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js +++ b/reference-implementation/to-upstream-wpts/piping/error-propagation-backward.js @@ -611,8 +611,7 @@ promise_test(t => { const ws = recordingWritableStream({ write() { resolveWriteCalled(); - // never settles, which normally would hang the pipe, except we error via the controller. - return new Promise(() => {}); + return flushAsyncEvents(); } }); @@ -629,6 +628,6 @@ promise_test(t => { assert_array_equals(ws.events, ['write', 'a']); }); -}, 'Errors must be propagated backward: erroring via the controller errors a slow write'); +}, 'Errors must be propagated backward: erroring via the controller errors once pending write completes'); done(); diff --git a/reference-implementation/to-upstream-wpts/writable-streams/aborting.js b/reference-implementation/to-upstream-wpts/writable-streams/aborting.js index d98b6e4eb..cbb5228cb 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/aborting.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/aborting.js @@ -75,8 +75,6 @@ promise_test(t => { }); }, 'Aborting a WritableStream immediately prevents future writes'); -// https://github.com/whatwg/streams/issues/611 -/* promise_test(t => { const ws = recordingWritableStream(); const results = []; @@ -91,19 +89,20 @@ promise_test(t => { promise_rejects(t, new TypeError(), writer.write(3), 'write(3) must reject with a TypeError') ); - writer.abort(); + const abortPromise = writer.abort(); results.push( promise_rejects(t, new TypeError(), writer.write(4), 'write(4) must reject with a TypeError'), promise_rejects(t, new TypeError(), writer.write(5), 'write(5) must reject with a TypeError') ); + + return abortPromise; }).then(() => { assert_array_equals(ws.events, ['write', 1, 'abort', undefined]); return Promise.all(results); }); }, 'Aborting a WritableStream prevents further writes after any that are in progress'); -*/ promise_test(() => { const ws = new WritableStream({ @@ -190,9 +189,12 @@ promise_test(t => { }, 'Closing but then immediately aborting a WritableStream causes the stream to error'); promise_test(t => { + let resolveClose; const ws = new WritableStream({ close() { - return new Promise(() => { }); // forever-pending + return new Promise(resolve => { + resolveClose = resolve; + }); } }); const writer = ws.getWriter(); @@ -201,11 +203,12 @@ promise_test(t => { return delay(0).then(() => { writer.abort(error1); - }) - .then(() => Promise.all([ - promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'), - promise_rejects(t, new TypeError(), closePromise, 'close() should reject with a TypeError') - ])); + resolveClose(); + return Promise.all([ + promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'), + closePromise + ]); + }); }, 'Closing a WritableStream and aborting it while it closes causes the stream to error'); promise_test(() => { @@ -255,4 +258,213 @@ promise_test(() => { return writer.abort().then(() => assert_true(thenCalled, 'then() should be called')); }, 'returning a thenable from abort() should work'); +promise_test(t => { + const ws = new WritableStream({ + write() { + return flushAsyncEvents(); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const writePromise = writer.write('a'); + writer.abort(error1); + let closedResolved = false; + return Promise.all([ + writePromise.then(() => assert_false(closedResolved, '.closed should not resolve before write()')), + promise_rejects(t, new TypeError(), writer.closed, '.closed should reject').then(() => { + closedResolved = true; + }) + ]); + }); +}, '.closed should not resolve before fulfilled write()'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const writePromise = writer.write('a'); + const abortPromise = writer.abort(error1); + let closedResolved = false; + return Promise.all([ + promise_rejects(t, error1, writePromise, 'write() should reject') + .then(() => assert_false(closedResolved, '.closed should not resolve before write()')), + promise_rejects(t, error1, writer.closed, '.closed should reject') + .then(() => { + closedResolved = true; + }), + promise_rejects(t, error1, abortPromise, 'abort() should reject')]); + }); +}, '.closed should not resolve before rejected write()'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return flushAsyncEvents(); + } + }, new CountQueuingStrategy(4)); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const settlementOrder = []; + return Promise.all([ + writer.write('1').then(() => settlementOrder.push(1)), + promise_rejects(t, new TypeError(), writer.write('2'), 'first queued write should be rejected') + .then(() => settlementOrder.push(2)), + promise_rejects(t, new TypeError(), writer.write('3'), 'second queued write should be rejected') + .then(() => settlementOrder.push(3)), + writer.abort(error1) + ]).then(() => assert_array_equals([1, 2, 3], settlementOrder, 'writes should be satisfied in order')); + }); +}, 'writes should be satisfied in order when aborting'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }, new CountQueuingStrategy(4)); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const settlementOrder = []; + return Promise.all([ + promise_rejects(t, error1, writer.write('1'), 'pending write should be rejected') + .then(() => settlementOrder.push(1)), + promise_rejects(t, error1, writer.write('2'), 'first queued write should be rejected') + .then(() => settlementOrder.push(2)), + promise_rejects(t, error1, writer.write('3'), 'second queued write should be rejected') + .then(() => settlementOrder.push(3)), + promise_rejects(t, error1, writer.abort(error1), 'abort should be rejected') + ]).then(() => assert_array_equals([1, 2, 3], settlementOrder, 'writes should be satisfied in order')); + }); +}, 'writes should be satisfied in order after rejected write when aborting'); + +promise_test(t => { + const ws = new WritableStream({ + write() { + return Promise.reject(error1); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + return Promise.all([ + promise_rejects(t, error1, writer.write('a'), 'writer.write() should reject with error from underlying write()'), + promise_rejects(t, error1, writer.close(), 'writer.close() should reject with error from underlying write()'), + promise_rejects(t, error1, writer.abort(), 'writer.abort() should reject with error from underlying write()') + ]); + }); +}, 'close() should use error from underlying write() on abort'); + +promise_test(() => { + let resolveWrite; + let abort_called = false; + const ws = new WritableStream({ + write() { + return new Promise(resolve => { + resolveWrite = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while write is pending'); + resolveWrite(); + return abortPromise.then(() => assert_true(abort_called, 'abort should be called')); + }); + }); +}, 'underlying abort() should not be called until underlying write() completes'); + +promise_test(() => { + let resolveClose; + let abort_called = false; + const ws = new WritableStream({ + close() { + return new Promise(resolve => { + resolveClose = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.close(); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while close is pending'); + resolveClose(); + return abortPromise.then(() => assert_false(abort_called, 'abort should not be called after close completes')); + }); + }); +}, 'underlying abort() should not be called if underlying close() has started'); + +promise_test(t => { + let resolveWrite; + let abort_called = false; + const ws = new WritableStream({ + write() { + return new Promise(resolve => { + resolveWrite = resolve; + }); + }, + abort() { + abort_called = true; + } + }); + + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + const closePromise = writer.close(); + const abortPromise = writer.abort(); + return flushAsyncEvents().then(() => { + assert_false(abort_called, 'abort should not be called while write is pending'); + resolveWrite(); + return abortPromise.then(() => { + assert_true(abort_called, 'abort should be called after write completes'); + return promise_rejects(t, new TypeError(), closePromise, 'promise returned by close() should be rejected'); + }); + }); + }); +}, 'underlying abort() should be called while closing if underlying close() has not started yet'); + +promise_test(() => { + const ws = new WritableStream(); + const writer = ws.getWriter(); + return writer.ready.then(() => { + const closePromise = writer.close(); + const abortPromise = writer.abort(); + let closeResolved = false; + Promise.all([ + closePromise.then(() => { closeResolved = true; }), + abortPromise.then(() => { assert_true(closeResolved, 'close() promise should resolve before abort() promise'); }) + ]); + }); +}, 'writer close() promise should resolve before abort() promise'); + +promise_test(t => { + const ws = new WritableStream({ + write(chunk, controller) { + controller.error(error1); + return new Promise(() => {}); + } + }); + const writer = ws.getWriter(); + return writer.ready.then(() => { + writer.write('a'); + return promise_rejects(t, error1, writer.ready, 'writer.ready should reject'); + }); +}, 'writer.ready should reject on controller error without waiting for underlying write'); + done(); diff --git a/reference-implementation/to-upstream-wpts/writable-streams/close.js b/reference-implementation/to-upstream-wpts/writable-streams/close.js index cbcad8dc2..fb1552379 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/close.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/close.js @@ -33,7 +33,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, passedError, writer.close(), 'close() should be rejected with the passed error'), + writer.close(), delay(10).then(() => controller.error(passedError)), promise_rejects(t, passedError, writer.closed, 'closed promise should be rejected with the passed error'), @@ -51,8 +51,7 @@ promise_test(t => { const writer = ws.getWriter(); - return promise_rejects(t, passedError, writer.close(), 'close promise should be rejected with the passed error') - .then(() => promise_rejects(t, passedError, writer.closed, 'closed should stay rejected')); + return writer.close().then(() => promise_rejects(t, passedError, writer.closed, 'closed should stay rejected')); }, 'when sink calls error synchronously while closing, the stream should become errored'); promise_test(() => { diff --git a/reference-implementation/to-upstream-wpts/writable-streams/constructor.js b/reference-implementation/to-upstream-wpts/writable-streams/constructor.js index 30c7c70be..dc2bef9a1 100644 --- a/reference-implementation/to-upstream-wpts/writable-streams/constructor.js +++ b/reference-implementation/to-upstream-wpts/writable-streams/constructor.js @@ -36,7 +36,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, error1, writer.write('a'), 'write() should reject with the error'), + writer.write('a'), promise_rejects(t, error1, writer.closed, 'controller.error() in write() should errored the stream') ]); }, 'controller argument should be passed to write method'); @@ -51,7 +51,7 @@ promise_test(t => { const writer = ws.getWriter(); return Promise.all([ - promise_rejects(t, error1, writer.close(), 'close() should reject with the error'), + writer.close(), promise_rejects(t, error1, writer.closed, 'controller.error() in close() should error the stream') ]); }, 'controller argument should be passed to close method');