Skip to content

Commit

Permalink
Make write() and close() non-interruptible (#619)
Browse files Browse the repository at this point in the history
The underlying abort() method will now not be called until any pending
underlying write() has finished. If an underlying close() is in progress
then abort() will no longer be called at all.

Other behavioural changes:

* If the underlying operation has started, then the writer's write() and
close() methods will always reflect the result of the underlying operation,
rather than being rejected because abort() was called.

* Consistently with the above, if an underlying operation calls
controller.error() then the writer method will still reflect the result from
the underlying operation.

* If a call to writer.abort() has to wait for an underlying write() or
close() to complete, and that underlying operation rejects, then the
underlying abort() will not be called, and writer.abort() will return the
rejection from the failed operation.
  • Loading branch information
ricea authored Dec 5, 2016
1 parent 872188a commit 591a6ed
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 68 deletions.
133 changes: 111 additions & 22 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,18 @@ Instances of {{WritableStream}} are created with the internal slots described in
<td>\[[writeRequests]]
<td>A List of promises representing the stream's internal queue of pending writes
</tr>
<tr>
<td>\[[pendingWriteRequest]]
<td>The promise for the current pending write operation
</tr>
<tr>
<td>\[[pendingCloseRequest]]
<td>The promise returned from the writer {{WritableStreamDefaultWriter/close()}} method
</tr>
<tr>
<td>\[[pendingAbortRequest]]
<td>The promise for a pending abort operation
</tr>
</table>

<h4 id="ws-constructor" constructor for="WritableStream" lt="WritableStream(underlyingSink, queuingStrategy)">new
Expand Down Expand Up @@ -2758,6 +2770,14 @@ writable stream is <a>locked to a writer</a>.
1. Assert: _state_ is `"writable"` or `"closing"`.
1. Let _error_ be a new *TypeError* indicating that the stream has been aborted.
1. Perform ! WritableStreamError(_stream_, _error_).
1. Let _controller_ be _stream_.[[writableStreamController]].
1. Assert: _controller_ is not *undefined*.
1. If _controller_.[[writing]] is *true* or _controller_.[[inClose]] is *true*,
1. Set _stream_.[[pendingAbortRequest]] to <a>a new promise</a>.
1. If _controller_.[[writing]] is *true*, return the result of transforming _stream_.[[pendingAbortRequest]] by a
fulfillment handler that returns !
WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_).
1. Otherwise, return _stream_.[[pendingAbortRequest]].
1. Return ! WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_).
</emu-alg>

Expand Down Expand Up @@ -2788,32 +2808,58 @@ visible through the {{WritableStream}}'s public API.
)</h4>

<emu-alg>
1. Let _state_ be _stream_.[[state]].
1. Assert: _state_ is `"writable"` or `"closing"`.
1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]],
1. <a>Reject</a> _writeRequest_ with _e_.
1. Set _stream_.[[writeRequests]] to an empty List.
1. Let _oldState_ be _stream_.[[state]].
1. Assert: _oldState_ is `"writable"` or `"closing"`.
1. Set _stream_.[[state]] to `"errored"`.
1. Set _stream_.[[storedError]] to _e_.
1. Let _controller_ be _stream_.[[writableStreamController]].
1. If _controller_ is *undefined*, or both _controller_.[[writing]] and _controller_.[[inClose]] are *false*,
perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. Let _writer_ be _stream_.[[writer]].
1. If _writer_ is not undefined,
1. <a>Reject</a> _writer_.[[closedPromise]] with _e_.
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
1. If _state_ is `"writable"` and !
1. If _oldState_ is `"writable"` and !
WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, <a>reject</a>
_writer_.[[readyPromise]] with _e_.
1. Otherwise, set _writer_.[[readyPromise]] to <a>a promise rejected with</a> _e_.
1. Set _writer_.[[readyPromise]].[[PromiseIsHandled]] to *true*.
1. Set _stream_.[[state]] to `"errored"`.
1. Set _stream_.[[storedError]] to _e_.
</emu-alg>

<h4 id="writable-stream-finish-close" aoid="WritableStreamFinishClose" nothrow>WritableStreamFinishClose (
<var>stream</var> )</h4>

<emu-alg>
1. Assert: _stream_.[[state]] is `"closing"`.
1. Assert: _stream_.[[state]] is `"closing"` or `"errored"`.
1. Assert: _stream_.[[writer]] is not *undefined*.
1. Set _stream_.[[state]] to `"closed"`.
1. <a>Resolve</a> _stream_.[[writer]].[[closedPromise]] with *undefined*.
1. If _stream_.[[state]] is `"closing"`,
1. <a>Resolve</a> _writer_.[[closedPromise]] with *undefined*.
1. Set _stream_.[[state]] to `"closed"`.
1. Otherwise,
1. Assert: _stream_.[[state]] is `"errored"`.
1. <a>Reject</a> _writer_.[[closedPromise]] with _stream_.[[storedError]].
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Resolve</a> _stream_.[[pendingAbortRequest]] with *undefined*.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
</emu-alg>

<h4 id="writable-stream-reject-unresolved-promises" aoid="WritableStreamRejectPromisesInReactionToError"
nothrow>WritableStreamRejectPromisesInReactionToError ( <var>stream</var> )</h4>

<emu-alg>
1. Assert: _stream_.[[state]] is `"errored"`.
1. Assert: _stream_.[[pendingWriteRequest]] is *undefined*.
1. Let _storedError_ be _stream_.[[storedError]].
1. Repeat for each _writeRequest_ that is an element of _stream_.[[writeRequests]],
1. <a>Reject</a> _writeRequest_ with _storedError_.
1. Set _stream_.[[writeRequests]] to an empty List.
1. If _stream_.[[pendingCloseRequest]] is not *undefined*,
1. Assert: _stream_.[[writableStreamController]].[[inClose]] is *false*.
1. <a>Reject</a> _stream_.[[pendingCloseRequest]] with _storedError_.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. Let _writer_ be _stream_.[[writer]].
1. If _writer_ is not undefined,
1. <a>Reject</a> _writer_.[[closedPromise]] with _storedError_.
1. Set _writer_.[[closedPromise]].[[PromiseIsHandled]] to *true*.
</emu-alg>

<h4 id="writable-stream-fulfill-write-request" aoid="WritableStreamFulfillWriteRequest"
Expand Down Expand Up @@ -3077,12 +3123,12 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"closed"` or `"errored"`, return <a>a promise rejected with</a> a *TypeError* exception.
1. Assert: _state_ is `"writable"`.
1. Let _promise_ be ! WritableStreamAddWriteRequest(_stream_).
1. Set _stream_.[[pendingCloseRequest]] to <a>a new promise</a>.
1. If ! WritableStreamDefaultControllerGetBackpressure(_stream_.[[writableStreamController]]) is *true*, <a>resolve</a>
_writer_.[[readyPromise]] with *undefined*.
1. Set _stream_.[[state]] to `"closing"`.
1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]).
1. Return _promise_.
1. Return _stream_.[[pendingCloseRequest]].
</emu-alg>

<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
Expand Down Expand Up @@ -3212,6 +3258,11 @@ Instances of {{WritableStreamDefaultController}} are created with the internal s
<td>A boolean flag set to <emu-val>true</emu-val> while the <a>underlying sink</a>'s <code>write</code> method is
executing and has not yet fulfilled, used to prevent reentrant calls
</tr>
<tr>
<td>\[[inClose]]
<td>A boolean flag set to <emu-val>true</emu-val> while the <a>underlying sink</a>'s <code>close</code> method is
executing and has not yet fulfilled, used to prevent the {{WritableStreamDefaultWriter/abort()}} method from
interrupting close
</table>

<h4 id="ws-default-controller-constructor" constructor for="WritableStreamDefaultController"
Expand All @@ -3230,7 +3281,7 @@ WritableStreamDefaultController(<var>stream</var>, <var>underlyingSink</var>, <v
1. Set *this*.[[controlledWritableStream]] to _stream_.
1. Set *this*.[[underlyingSink]] to _underlyingSink_.
1. Set *this*.[[queue]] to a new empty List.
1. Set *this*.[[started]] and *this*.[[writing]] to *false*.
1. Set *this*.[[started]] and *this*.[[writing]] and *this*.[[inClose]] to *false*.
1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to
_normalizedStrategy_.[[highWaterMark]].
Expand Down Expand Up @@ -3359,12 +3410,25 @@ nothrow>WritableStreamDefaultControllerProcessClose ( <var>controller</var> )</h
1. Assert: _stream_.[[state]] is `"closing"`.
1. Perform ! DequeueValue(_controller_.[[queue]]).
1. Assert: _controller_.[[queue]] is empty.
1. Set _controller_.[[InClose]] to *true*.
1. Let _sinkClosePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"close"`, « ‍_controller_ »).
1. <a>Upon fulfillment</a> of _sinkClosePromise_,
1. If _stream_.[[state]] is not `"closing"`, return.
1. Perform ! WritableStreamFulfillWriteRequest(_stream_).
1. Assert: _controller_.[[inClose]] is *true*.
1. Set _controller_.[[inClose]] to *false*.
1. If _stream_.[[state]] is not `"closing"` or `"errored"`, return.

This comment has been minimized.

Copy link
@tyoshino

tyoshino Jan 6, 2017

Member

Can't we make this an assertion?

This comment has been minimized.

Copy link
@ricea

ricea Jan 6, 2017

Author Collaborator

Yes. Thank you for spotting this. #641.

1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*.
1. <a>Resolve</a> _stream_.[[pendingCloseRequest]] with *undefined*.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. Perform ! WritableStreamFinishClose(_stream_).
1. <a>Upon rejection</a> of _sinkClosePromise_ with reason _r_,
1. Assert: _controller_.[[inClose]] it *true*.
1. Set _controller_.[[InClose]] to *false*.
1. Assert: _stream_.[[pendingCloseRequest]] is not *undefined*.
1. <a>Reject</a> _stream_.[[pendingCloseRequest]] with _r_.
1. Set _stream_.[[pendingCloseRequest]] to *undefined*.
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Reject</a> _stream_.[[pendingAbortRequest]] with _r_.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_).
</emu-alg>

Expand All @@ -3373,22 +3437,47 @@ nothrow>WritableStreamDefaultControllerProcessWrite ( <var>controller</var>, <va

<emu-alg>
1. Set _controller_.[[writing]] to *true*.
1. Let _stream_ be _controller_.[[controllerWritableStream]].
1. Assert: _stream_.[[pendingWriteRequest]] is undefined.
1. Assert: _stream_.[[writeRequests]] is not empty.
1. Let _writeRequest_ be the first element of _stream_.[[writeRequests]].
1. Remove _writeRequest_ from _stream_.[[writeRequests]], shifting all other elements downward (so that the second
becomes the first, and so on).
1. Set _stream_.[[pendingWriteRequest]] to _writeRequest_.
1. Let _sinkWritePromise_ be ! PromiseInvokeOrNoop(_controller_.[[underlyingSink]], `"write"`, « ‍_chunk_,
_controller_ »).
1. <a>Upon fulfillment</a> of _sinkWritePromise_,
1. Let _stream_ be _controller_.[[controlledWritableStream]].
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"errored"` or `"closed"`, return.
1. Assert: _controller_.[[writing]] is *true*.
1. Set _controller_.[[writing]] to *false*.
1. Perform ! WritableStreamFulfillWriteRequest(_stream_).
1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*.
1. <a>Resolve</a> _stream_.[[pendingWriteRequest]] with *undefined*.
1. Set _stream_.[[pendingWriteRequest]] to *undefined*.
1. If _state_ is `"errored"`,
1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Resolve</a> _stream_.[[pendingAbortRequest]] with *undefined*.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Return.
1. Let _lastBackpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_).
1. Perform ! DequeueValue(_controller_.[[queue]]).
1. If _state_ is not `"closing"`,
1. Let _backpressure_ be ! WritableStreamDefaultControllerGetBackpressure(_controller_).
1. If _lastBackpressure_ is not _backpressure_, perform !
WritableStreamUpdateBackpressure(_controller_.[[controlledWritableStream]], _backpressure_).
1. Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(_controller_).
1. <a>Upon rejection</a> of _sinkWritePromise_ with reason _r_,
1. <a>Upon rejection</a> of _sinkWritePromise_ with _r_,
1. Assert: _controller_.[[writing]] is *true*.
1. Set _controller_.[[writing]] to *false*.
1. Assert: _stream_.[[pendingWriteRequest]] is not *undefined*.
1. <a>Reject</a> _stream_.[[pendingWriteRequest]] with _r_.
1. Set _stream_.[[pendingWriteRequest]] to *undefined*.
1. If _stream_.[[state]] is `"errored"`,
1. Set _stream_.[[storedError]] to _r_.
1. Perform ! WritableStreamRejectPromisesInReactionToError(_stream_).
1. If _stream_.[[pendingAbortRequest]] is not *undefined*,
1. <a>Reject</a> _stream_.[[pendingAbortRequest]] with _r_.
1. Set _stream_.[[pendingAbortRequest]] to *undefined*.
1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(_controller_, _r_).
</emu-alg>

Expand Down
5 changes: 4 additions & 1 deletion reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ function TransformStreamTransform(transformStream, chunk) {

return TransformStreamReadableReadyPromise(transformStream);
},
e => TransformStreamErrorIfNeeded(transformStream, e));
e => {
TransformStreamErrorIfNeeded(transformStream, e);
return Promise.reject(e);
});
}

function IsTransformStreamDefaultController(x) {
Expand Down
Loading

0 comments on commit 591a6ed

Please sign in to comment.