Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aborting a stream should wait for pending writes #619

Merged
merged 13 commits into from
Dec 5, 2016
133 changes: 111 additions & 22 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,18 @@ Instances of {{WritableStream}} are created with the internal slots described in
<td>\[[writeRequests]]
<td>A List of promises representing the stream's internal queue of pending writes
</tr>
<tr>
<td>\[[pendingWriteRequest]]
<td>The promise for the current pending write operation
</tr>
<tr>
<td>\[[pendingCloseRequest]]
<td>The promise returned from the writer {{WritableStreamDefaultWriter/close()}} method
</tr>
<tr>
<td>\[[pendingAbortRequest]]
<td>The promise for a pending abort operation
</tr>
</table>

<h4 id="ws-constructor" constructor for="WritableStream" lt="WritableStream(underlyingSink, queuingStrategy)">new
Expand Down Expand Up @@ -2758,6 +2770,14 @@ writable stream is <a>locked to a writer</a>.
1. Assert: _state_ is `"writable"` or `"closing"`.
1. Let _error_ be a new *TypeError* indicating that the stream has been aborted.
1. Perform ! WritableStreamError(_stream_, _error_).
1. Let _controller_ be _stream_.[[writableStreamController]].
1. Assert: _controller_ is not *undefined*.
1. If _controller_.[[writing]] is *true* or _controller_.[[inClose]] is *true*,
1. Set _stream_.[[pendingAbortRequest]] to <a>a new promise</a>.
1. If _controller_.[[writing]] is *true*, return the result of transforming _stream_.[[pendingAbortRequest]] by a
fulfillment handler that returns !
WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_).
1. Otherwise, return _stream_.[[pendingAbortRequest]].
1. Return ! WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_).
</emu-alg>

Expand Down Expand Up @@ -2788,32 +2808,58 @@ visible through the {{WritableStream}}'s public API.
)</h4>

<emu-alg>
1. Let _state_ be _stream_.[[state]].
1. Assert: _state_ is `"writable"` or `"closing"`.
1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]],
1. <a>Reject</a> _writeRequest_ with _e_.
1. Set _stream_.[[writeRequests]] to an empty List.
1. Let _oldState_ be _stream_.[[state]].
1. Assert: _oldState_ is `"writable"` or `"closing"`.
1. Set _stream_.[[state]] to `"errored"`.
1. Set _stream_.[[storedError]] to _e_.
1. Let _controller_ be _stream_.[[writableStreamController]].
1. If _controller_ is *undefined*, or both _controller_.[[writing]] and _controller_.[[inClose]] are *false*,
perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. Let _writer_ be _stream_.[[writer]].
1. If _writer_ is not undefined,
1. <a>Reject</a> _writer_.[[closedPromise]] with _e_.
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
1. If _state_ is `"writable"` and !
1. If _oldState_ is `"writable"` and !
WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, <a>reject</a>
_writer_.[[readyPromise]] with _e_.
1. Otherwise, set _writer_.[[readyPromise]] to <a>a promise rejected with</a> _e_.
1. Set _writer_.[[readyPromise]].[[PromiseIsHandled]] to *true*.
1. Set _stream_.[[state]] to `"errored"`.
1. Set _stream_.[[storedError]] to _e_.
</emu-alg>

<h4 id="writable-stream-finish-close" aoid="WritableStreamFinishClose" nothrow>WritableStreamFinishClose (
<var>stream</var> )</h4>

<emu-alg>
1. Assert: _stream_.[[state]] is `"closing"`.
1. Assert: _stream_.[[state]] is `"closing"` or `"errored"`.
1. Assert: _stream_.[[writer]] is not *undefined*.
1. Set _stream_.[[state]] to `"closed"`.
1. <a>Resolve</a> _stream_.[[writer]].[[closedPromise]] with *undefined*.
1. If _stream_.[[state]] is `"closing"`,
1. <a>Resolve</a> _writer_.[[closedPromise]] with *undefined*.
1. Set _stream_.[[state]] to `"closed"`.
1. Otherwise,
1. Assert: _stream_.[[state]] is `"errored"`.
1. <a>Reject</a> _writer_.[[closedPromise]] with _stream_.[[storedError]].
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Resolve</a> _stream_.[[pendingAbortRequest]] with *undefined*.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
</emu-alg>

<h4 id="writable-stream-reject-unresolved-promises" aoid="WritableStreamRejectPromisesInReactionToError"
nothrow>WritableStreamRejectPromisesInReactionToError ( <var>stream</var> )</h4>

<emu-alg>
1. Assert: _stream_.[[state]] is `"errored"`.
1. Assert: _stream_.[[pendingWriteRequest]] is *undefined*.
1. Let _storedError_ be _stream_.[[storedError]].
1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]],
1. <a>Reject</a> _writeRequest_ with _storedError_.
1. Set _stream_.[[writeRequests]] to an empty List.
1. If _stream_.[[pendingCloseRequest]] is not *undefined*,
1. Assert: _stream_.[[writableStreamController]].[[inClose]] is *false*.
1. <a>Reject</a> _stream_.[[pendingCloseRequest]] with _storedError_.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. Let _writer_ be _stream_.[[writer]].
1. If _writer_ is not undefined,
1. <a>Reject</a> _writer_.[[closedPromise]] with _storedError_.
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
</emu-alg>

<h4 id="writable-stream-fulfill-write-request" aoid="WritableStreamFulfillWriteRequest"
Expand Down Expand Up @@ -3077,12 +3123,12 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"closed"` or `"errored"`, return <a>a promise rejected with</a> a *TypeError* exception.
1. Assert: _state_ is `"writable"`.
1. Let _promise_ be ! WritableStreamAddWriteRequest(_stream_).
1. Set _stream_.[[pendingCloseRequest]] to <a>a new promise</a>.
1. If ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, <a>resolve</a>
_writer_.[[readyPromise]] with *undefined*.
1. Set _stream_.[[state]] to `"closing"`.
1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]).
1. Return _promise_.
1. Return _stream_.[[pendingCloseRequest]].
</emu-alg>

<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
Expand Down Expand Up @@ -3212,6 +3258,11 @@ Instances of {{WritableStreamDefaultController}} are created with the internal s
<td>A boolean flag set to <emu-val>true</emu-val> while the <a>underlying sink</a>'s <code>write</code> method is
executing and has not yet fulfilled, used to prevent reentrant calls
</tr>
<tr>
<td>\[[inClose]]
<td>A boolean flag set to <emu-val>true</emu-val> while the <a>underlying sink</a>'s <code>close</code> method is
executing and has not yet fulfilled, used to prevent the {{WritableStreamDefaultWriter/abort()}} method from
interrupting close
</table>

<h4 id="ws-default-controller-constructor" constructor for="WritableStreamDefaultController"
Expand All @@ -3230,7 +3281,7 @@ WritableStreamDefaultController(<var>stream</var>, <var>underlyingSink</var>, <v
1. Set *this*.[[controlledWritableStream]] to _stream_.
1. Set *this*.[[underlyingSink]] to _underlyingSink_.
1. Set *this*.[[queue]] to a new empty List.
1. Set *this*.[[started]] and *this*.[[writing]] to *false*.
1. Set *this*.[[started]] and *this*.[[writing]] and *this*.[[inClose]] to *false*.
1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to
_normalizedStrategy_.[[highWaterMark]].
Expand Down Expand Up @@ -3359,12 +3410,25 @@ nothrow>WritableStreamDefaultControllerProcessClose ( <var>controller</var> )</h
1. Assert: _stream_.[[state]] is `"closing"`.
1. Perform ! DequeueValue(_controller_.[[queue]]).
1. Assert: _controller_.[[queue]] is empty.
1. Set _controller_.[[InClose]] to *true*.
1. Let _sinkClosePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"close"`, « ‍_controller_ »).
1. <a>Upon fulfillment</a> of _sinkClosePromise_,
1. If _stream_.[[state]] is not `"closing"`, return.
1. Perform ! WritableStreamFulfillWriteRequest(_stream_).
1. Assert: _controller_.[[inClose]] is *true*.
1. Set _controller_.[[inClose]] to *false*.
1. If _stream_.[[state]] is not `"closing"` or `"errored"`, return.
1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*.
1. <a>Resolve</a> _stream_.[[pendingCloseRequest]] with *undefined*.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. Perform ! WritableStreamFinishClose(_stream_).
1. <a>Upon rejection</a> of _sinkClosePromise_ with reason _r_,
1. Assert: _controller_.[[inClose]] it *true*.
1. Set _controller_.[[InClose]] to *false*.
1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*.
1. <a>Reject</a> _stream_.[[pendingCloseRequest]] with _r_.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Reject</a> _stream_.[[pendingAbortRequest]] with _r_.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_).
</emu-alg>

Expand All @@ -3373,22 +3437,47 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( <var>controller</var>, <va

<emu-alg>
1. Set _controller_.[[writing]] to *true*.
1. Let _stream_ be _controller_.[[controllerWritableStream]].
1. Assert: _stream_.[[pendingWriteRequest]] is undefined.
1. Assert: _stream_.[[writeRequests]] is not empty.
1. Let _writeRequest_ be the first element of _stream_.[[writeRequests]].
1. Remove _writeRequest_ from _stream_.[[writeRequests]], shifting all other elements downward (so that the second
becomes the first, and so on).
1. Set _stream_.[[pendingWriteRequest]] to _writeRequest_.
1. Let _sinkWritePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"write"`, « ‍_chunk_,
_controller_ »).
1. <a>Upon fulfillment</a> of _sinkWritePromise_,
1. Let _stream_ be _controller_.[[controlledWritableStream]].
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"errored"` or `"closed"`, return.
1. Assert: _controller_.[[writing]] is *true*.
1. Set _controller_.[[writing]] to *false*.
1. Perform ! WritableStreamFulfillWriteRequest(_stream_).
1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*.
1. <a>Resolve</a> _stream_.[[pendingWriteRequest]] with *undefined*.
1. Set _stream_.[[pendingWriteRequest]] to *undefined*.
1. If _state_ is `"errored"`,
1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Resolve</a> _stream_.[[pendingAbortRequest]] with *undefined*.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Return.
1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_).
1. Perform ! DequeueValue(_controller_.[[queue]]).
1. If _state_ is not `"closing"`,
1. Let _backpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_).
1. If _lastBackpressure_ is not _backpressure_, perform !
WritableStreamUpdateBackpressure(_controller_.[[controlledWritableStream]], _backpressure_).
1. Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(_controller_).
1. <a>Upon rejection</a> of _sinkWritePromise_ with reason _r_,
1. <a>Upon rejection</a> of _sinkWritePromise_ with _r_,
1. Assert: _controller_.[[writing]] is *true*.
1. Set _controller_.[[writing]] to *false*.
1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*.
1. <a>Reject</a> _stream_.[[pendingWriteRequest]] with _r_.
1. Set _stream_.[[pendingWriteRequest]] to *undefined*.
1. If _stream_.[[state]] is `"errored"`,
1. Set _stream_.[[storedError]] to _r_.
1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Reject</a> _stream_.[[pendingAbortRequest]] with _r_.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_).
</emu-alg>

Expand Down
5 changes: 4 additions & 1 deletion reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ function TransformStreamTransform(transformStream, chunk) {

return TransformStreamReadableReadyPromise(transformStream);
},
e => TransformStreamErrorIfNeeded(transformStream, e));
e => {
TransformStreamErrorIfNeeded(transformStream, e);
return Promise.reject(e);
});
}

function IsTransformStreamDefaultController(x) {
Expand Down
Loading