Skip to content

Commit

Permalink
Add close() method to WritableStream
Browse files Browse the repository at this point in the history
Fixes #1007.
  • Loading branch information
MattiasBuelens authored and domenic committed Nov 3, 2019
1 parent d26db0e commit 547428b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 44 deletions.
55 changes: 39 additions & 16 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -3171,6 +3171,7 @@ like
get <a href="#ws-locked">locked</a>()

<a href="#ws-abort">abort</a>(reason)
<a href="#ws-close">close</a>()
<a href="#ws-get-writer">getWriter</a>()
}
</code></pre>
Expand Down Expand Up @@ -3385,6 +3386,25 @@ bridging the gap with non-promise-based APIs, as seen for example in [[#example-
1. Return ! WritableStreamAbort(*this*, _reason_).
</emu-alg>

<h5 id="ws-close" method for="WritableStream">close()</h5>

<div class="note">
The <code>close</code> method closes the stream. The <a>underlying sink</a> will finish processing any
previously-written <a>chunks</a>, before invoking its close behavior. During this time any further attempts to write
will fail (without erroring the stream).

The method returns a promise that is fulfilled with <emu-val>undefined</emu-val> if all remaining <a>chunks</a> are
successfully written and the stream successfully closes, or rejects if an error is encountered during this process.
</div>

<emu-alg>
1. If ! IsWritableStream(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
1. If ! IsWritableStreamLocked(*this*) is *true*, return <a>a promise rejected with</a> a *TypeError* exception.
1. If ! WritableStreamCloseQueuedOrInFlight(*this*) is *true*, return <a>a promise rejected with</a> a *TypeError*
exception.
1. Return ! WritableStreamClose(*this*).
</emu-alg>

<h5 id="ws-get-writer" method for="WritableStream">getWriter()</h5>

<div class="note">
Expand Down Expand Up @@ -3490,6 +3510,22 @@ writable stream is <a>locked to a writer</a>.
1. Return _promise_.
</emu-alg>

<h4 id="writable-stream-close" aoid="WritableStreamClose" nothrow>WritableStreamClose ( <var>stream</var> )</h4>

<emu-alg>
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"closed"` or `"errored"`, return <a>a promise rejected with</a> a *TypeError* exception.
1. Assert: _state_ is `"writable"` or `"erroring"`.
1. Assert: ! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.
1. Let _promise_ be <a>a new promise</a>.
1. Set _stream_.[[closeRequest]] to _promise_.
1. Let _writer_ be _stream_.[[writer]].
1. If _writer_ is not *undefined*, and _stream_.[[backpressure]] is *true*, and _state_ is `"writable"`,
<a>resolve</a> _writer_.[[readyPromise]] with *undefined*.
1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]).
1. Return _promise_.
</emu-alg>

<h3 id="ws-abstract-ops-used-by-controllers">Writable stream abstract operations used by controllers</h3>

To allow future flexibility to add different writable stream behaviors (similar to the distinction between default
Expand Down Expand Up @@ -3853,12 +3889,8 @@ lt="WritableStreamDefaultWriter(stream)">new WritableStreamDefaultWriter(<var>st
<h5 id="default-writer-close" method for="WritableStreamDefaultWriter">close()</h5>

<div class="note">
The <code>close</code> method will close the associated writable stream. The <a>underlying sink</a> will finish
processing any previously-written <a>chunks</a>, before invoking its close behavior. During this time any further
attempts to write will fail (without erroring the stream).

The method returns a promise that is fulfilled with <emu-val>undefined</emu-val> if all remaining <a>chunks</a> are
successfully written and the stream successfully closes, or rejects if an error is encountered during this process.
If the writer is <a lt="active writer">active</a>, the <code>close</code> method behaves the same as that for the
associated stream. (Otherwise, it returns a rejected promise.)
</div>

<emu-alg>
Expand Down Expand Up @@ -3936,16 +3968,7 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
<emu-alg>
1. Let _stream_ be _writer_.[[ownerWritableStream]].
1. Assert: _stream_ is not *undefined*.
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"closed"` or `"errored"`, return <a>a promise rejected with</a> a *TypeError* exception.
1. Assert: _state_ is `"writable"` or `"erroring"`.
1. Assert: ! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.
1. Let _promise_ be <a>a new promise</a>.
1. Set _stream_.[[closeRequest]] to _promise_.
1. If _stream_.[[backpressure]] is *true* and _state_ is `"writable"`, <a>resolve</a> _writer_.[[readyPromise]] with
*undefined*.
1. Perform ! WritableStreamDefaultControllerClose(_stream_.[[writableStreamController]]).
1. Return _promise_.
1. Return ! WritableStreamClose(_stream_).
</emu-alg>

<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
Expand Down
75 changes: 48 additions & 27 deletions reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ class WritableStream {
return WritableStreamAbort(this, reason);
}

close() {
if (IsWritableStream(this) === false) {
return promiseRejectedWith(streamBrandCheckException('close'));
}

if (IsWritableStreamLocked(this) === true) {
return promiseRejectedWith(new TypeError('Cannot close a stream that already has a writer'));
}

if (WritableStreamCloseQueuedOrInFlight(this) === true) {
return promiseRejectedWith(new TypeError('Cannot close an already-closing stream'));
}

return WritableStreamClose(this);
}

getWriter() {
if (IsWritableStream(this) === false) {
throw streamBrandCheckException('getWriter');
Expand All @@ -71,6 +87,7 @@ module.exports = {
IsWritableStreamLocked,
WritableStream,
WritableStreamAbort,
WritableStreamClose,
WritableStreamDefaultControllerErrorIfNeeded,
WritableStreamDefaultWriterCloseWithErrorPropagation,
WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -192,6 +209,35 @@ function WritableStreamAbort(stream, reason) {
return promise;
}

function WritableStreamClose(stream) {
const state = stream._state;
if (state === 'closed' || state === 'errored') {
return promiseRejectedWith(new TypeError(
`The stream (in ${state} state) is not in the writable state and cannot be closed`));
}

assert(state === 'writable' || state === 'erroring');
assert(WritableStreamCloseQueuedOrInFlight(stream) === false);

const promise = newPromise((resolve, reject) => {
const closeRequest = {
_resolve: resolve,
_reject: reject
};

stream._closeRequest = closeRequest;
});

const writer = stream._writer;
if (writer !== undefined && stream._backpressure === true && state === 'writable') {
defaultWriterReadyPromiseResolve(writer);
}

WritableStreamDefaultControllerClose(stream._writableStreamController);

return promise;
}

// WritableStream API exposed for controllers.

function WritableStreamAddWriteRequest(stream) {
Expand Down Expand Up @@ -501,7 +547,7 @@ class WritableStreamDefaultWriter {
}

if (WritableStreamCloseQueuedOrInFlight(stream) === true) {
return promiseRejectedWith(new TypeError('cannot close an already-closing stream'));
return promiseRejectedWith(new TypeError('Cannot close an already-closing stream'));
}

return WritableStreamDefaultWriterClose(this);
Expand Down Expand Up @@ -565,34 +611,9 @@ function WritableStreamDefaultWriterClose(writer) {

assert(stream !== undefined);

const state = stream._state;
if (state === 'closed' || state === 'errored') {
return promiseRejectedWith(new TypeError(
`The stream (in ${state} state) is not in the writable state and cannot be closed`));
}

assert(state === 'writable' || state === 'erroring');
assert(WritableStreamCloseQueuedOrInFlight(stream) === false);

const promise = newPromise((resolve, reject) => {
const closeRequest = {
_resolve: resolve,
_reject: reject
};

stream._closeRequest = closeRequest;
});

if (stream._backpressure === true && state === 'writable') {
defaultWriterReadyPromiseResolve(writer);
}

WritableStreamDefaultControllerClose(stream._writableStreamController);

return promise;
return WritableStreamClose(stream);
}


function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) {
const stream = writer._ownerWritableStream;

Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests

0 comments on commit 547428b

Please sign in to comment.