diff --git a/index.bs b/index.bs index 7941a24a0..d238f6f1e 100644 --- a/index.bs +++ b/index.bs @@ -1035,11 +1035,6 @@ Instances of WritableStream are created with the internal slots des A promise that becomes fulfilled when the stream becomes "closed"; returned by the closed getter - - \[[error]] - A Writable Stream Error Function created with the ability to move this stream to an - "errored" state - \[[queue]] A List representing the stream's internal queue of pending writes @@ -1127,15 +1122,26 @@ Instances of WritableStream are created with the internal slots des 1. Set *this*@[[queue]] to a new empty List. 1. Set *this*@[[state]] to "writable". 1. Set *this*@[[started]] and *this*@[[writing]] to *false*. - 1. Set *this*@[[error]] to CreateWritableStreamErrorFunction(*this*). 1. Call-with-rethrow SyncWritableStreamStateWithQueue(*this*). - 1. Let _startResult_ be InvokeOrNoop(_underlyingSink_, "start", «‍*this*@[[error]]»). + 1. Let _error_ be a new WritableStream error function. + 1. Set _error_@[[stream]] to *this*. + 1. Let _startResult_ be InvokeOrNoop(_underlyingSink_, "start", «_error_»). 1. ReturnIfAbrupt(_startResult_). 1. Set *this*@[[startedPromise]] to the result of resolving _startResult_ as a promise. 1. Upon fulfillment, 1. Set *this*@[[started]] to *true*. 1. Set *this*@[[startedPromise]] to *undefined*. - 1. Upon rejection with reason _r_, call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_r_»). + 1. Upon rejection with reason _r_, return ErrorWritableStream(*this*, _r_). + + +A WritableStream error function is an anonymous built-in function that is used to allow +underlying sinks to error their associated writable stream. Each WritableStream error function has +a \[[stream]] internal slot. When a WritableStream error function F is called with argument +e, it performs the following steps: + +
+  1. Let _stream_ be _F_@[[stream]].
+  1. Return ErrorWritableStream(_stream_, _e_).
 

Properties of the WritableStream Prototype

@@ -1211,7 +1217,7 @@ Instances of WritableStream are created with the internal slots des 1. If IsWritableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception. 1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]]. - 1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍reason»). + 1. Call-with-rethrow ErrorWritableStream(*this*, _reason_). 1. Let _sinkAbortPromise_ be PromiseInvokeOrFallbackOrNoop(*this*@[[underlyingSink]], "abort", «_reason_», "close", «»). 1. Return the result of transforming _sinkAbortPromise_ by a fulfillment handler that returns *undefined*. @@ -1257,24 +1263,24 @@ Instances of WritableStream are created with the internal slots des 1. Let _chunkSize_ be *1*. 1. Let _strategy_ be Get(*this*@[[underlyingSink]], "strategy"). 1. If _strategy_ is an abrupt completion, - 1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_strategy_.[[value]]»). + 1. Call-with-rethrow ErrorWritableStream(*this*, _strategy_.[[value]]). 1. Return a new promise rejected with _strategy_.[[value]]. 1. Set _strategy_ to _strategy_.[[value]]. 1. If _strategy_ is not *undefined*, then 1. Set _chunkSize_ to Invoke(_strategy_, "size", «‍_chunk_»). 1. If _chunkSize_ is an abrupt completion, - 1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_chunkSize_.[[value]]»). + 1. Call-with-rethrow ErrorWritableStream(*this*, _chunkSize_.[[value]]). 1. Return a new promise rejected with _chunkSize_.[[value]]. 1. Set _chunkSize_ to _chunkSize_.[[value]]. 1. Let _promise_ be a new promise. 1. Let _writeRecord_ be Record{[[promise]]: _promise_, [[chunk]]: _chunk_}. 1. Let _enqueueResult_ be EnqueueValueWithSize(*this*@[[queue]], _writeRecord_, _chunkSize_). 1. If _enqueueResult_ is an abrupt completion, - 1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_enqueueResult_.[[value]]»). + 1. Call-with-rethrow ErrorWritableStream(*this*, _enqueueResult_.[[value]]). 1. Return a new promise rejected with _enqueueResult_.[[value]]. 1. Let _syncResult_ be SyncWritableStreamStateWithQueue(*this*). 1. If _syncResult_ is an abrupt completion, - 1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_syncResult_.[[value]]»). + 1. Call-with-rethrow ErrorWritableStream(*this*, _syncResult_.[[value]]). 1. Return _promise_. 1. Call-with-rethrow CallOrScheduleWritableStreamAdvanceQueue(*this*). 1. Return _promise_. @@ -1301,18 +1307,11 @@ Instances of WritableStream are created with the internal slots des 1. Assert: _stream_@[[state]] is "closing". 1. Resolve _stream_@[[closedPromise]] with *undefined*. 1. Set _stream_@[[state]] to "closed". - 1. Upon rejection with reason _r_, call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_r_»). + 1. Upon rejection with reason _r_, return ErrorWritableStream(_stream_, _r_). 1. Return *undefined*. -

CreateWritableStreamErrorFunction ( stream )

- -
-  1. Return a new Writable Stream Error Function closing over _stream_.
-
- -A Writable Stream Error Function is a built-in anonymous function of one argument e, closing over -a variable stream, that performs the following steps: +

ErrorWritableStream ( stream, e )

   1. If _stream_@[[state]] is "closed" or "errored", return *undefined*.
@@ -1323,6 +1322,7 @@ a variable stream, that performs the following steps:
   1. If _stream_@[[state]] is "waiting", resolve _stream_@[[readyPromise]] with *undefined*.
   1. Reject _stream_@[[closedPromise]] with _e_.
   1. Set _stream_@[[state]] to "errored".
+  1. Return *undefined*.
 

IsWritableStream ( x )

@@ -1373,9 +1373,9 @@ a variable stream, that performs the following steps: 1. Resolve _writeRecord_.[[promise]] with *undefined*. 1. DequeueValue(_stream_@[[queue]]). 1. Let _syncResult_ be SyncWritableStreamStateWithQueue(_stream_). - 1. If _syncResult_ is an abrupt completion, then call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_syncResult_.[[value]]»). + 1. If _syncResult_ is an abrupt completion, then return ErrorWritableStream(_stream_, ‍_syncResult_.[[value]]). 1. Otherwise, return WritableStreamAdvanceQueue(_stream_). - 1. Upon rejection of _writeResult_ with reason _r_, call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_r_»). + 1. Upon rejection of _writeResult_ with reason _r_, return ErrorWritableStream(_stream_, _r_).

Transform Streams

@@ -2150,9 +2150,6 @@ itself will evolve in these ways. ReturnIfAbrupt(opResult)."
  • We use the shorthand phrases from the W3C TAG promises guide to operate on promises at a higher level than the ECMAScript spec does. -
  • We introduce the notion of creating a function that "closes over" a given variable. This is meant to work the - same as how the ECMAScript spec gives such functions internal slots which get filled in upon creation and then - have their values pulled out of during execution, but require less formal contortions.

    Acknowledgments

    diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index e972a371c..00e5ba5ff 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -20,17 +20,18 @@ export default class WritableStream { this._started = false; this._writing = false; - this._error = CreateWritableStreamErrorFunction(this); + const error = closure_WritableStreamErrorFunction(); + error._stream = this; SyncWritableStreamStateWithQueue(this); - const startResult = InvokeOrNoop(underlyingSink, 'start', [this._error]); + const startResult = InvokeOrNoop(underlyingSink, 'start', [error]); this._startedPromise = Promise.resolve(startResult); this._startedPromise.then(() => { this._started = true; this._startedPromise = undefined; }); - this._startedPromise.catch(r => this._error(r)); + this._startedPromise.catch(r => ErrorWritableStream(this, r)); } get closed() { @@ -61,7 +62,7 @@ export default class WritableStream { return Promise.reject(this._storedError); } - this._error(reason); + ErrorWritableStream(this, reason); const sinkAbortPromise = PromiseInvokeOrFallbackOrNoop(this._underlyingSink, 'abort', [reason], 'close', []); return sinkAbortPromise.then(() => undefined); } @@ -122,7 +123,7 @@ export default class WritableStream { try { strategy = this._underlyingSink.strategy; } catch (strategyE) { - this._error(strategyE); + ErrorWritableStream(this, strategyE); return Promise.reject(strategyE); } @@ -130,7 +131,7 @@ export default class WritableStream { try { chunkSize = strategy.size(chunk); } catch (chunkSizeE) { - this._error(chunkSizeE); + ErrorWritableStream(this, chunkSizeE); return Promise.reject(chunkSizeE); } } @@ -145,14 +146,14 @@ export default class WritableStream { try { EnqueueValueWithSize(this._queue, writeRecord, chunkSize); } catch (enqueueResultE) { - this._error(enqueueResultE); + ErrorWritableStream(this, enqueueResultE); return Promise.reject(enqueueResultE); } try { SyncWritableStreamStateWithQueue(this); } catch (syncResultE) { - this._error(syncResultE); + ErrorWritableStream(this, syncResultE); return promise; } @@ -161,6 +162,12 @@ export default class WritableStream { } } +function closure_WritableStreamErrorFunction() { + const f = e => ErrorWritableStream(f._stream, e); + return f; +} + + function CallOrScheduleWritableStreamAdvanceQueue(stream) { if (stream._started === false) { stream._startedPromise.then(() => { @@ -189,33 +196,29 @@ function CloseWritableStream(stream) { stream._closedPromise_resolve(undefined); stream._state = 'closed'; }, - r => { - stream._error(r); - } + r => ErrorWritableStream(stream, r) ); } -function CreateWritableStreamErrorFunction(stream) { - return e => { - if (stream._state === 'closed' || stream._state === 'errored') { - return undefined; - } +function ErrorWritableStream(stream, e) { + if (stream._state === 'closed' || stream._state === 'errored') { + return undefined; + } - while (stream._queue.length > 0) { - const writeRecord = DequeueValue(stream._queue); - if (writeRecord !== 'close') { - writeRecord._reject(e); - } + while (stream._queue.length > 0) { + const writeRecord = DequeueValue(stream._queue); + if (writeRecord !== 'close') { + writeRecord._reject(e); } + } - stream._storedError = e; + stream._storedError = e; - if (stream._state === 'waiting') { - stream._readyPromise_resolve(undefined); - } - stream._closedPromise_reject(e); - stream._state = 'errored'; - }; + if (stream._state === 'waiting') { + stream._readyPromise_resolve(undefined); + } + stream._closedPromise_reject(e); + stream._state = 'errored'; } export function IsWritableStream(x) { @@ -290,14 +293,11 @@ function WritableStreamAdvanceQueue(stream) { try { SyncWritableStreamStateWithQueue(stream); } catch (syncResultE) { - stream._error(syncResultE); - return; + return ErrorWritableStream(stream, syncResultE); } return WritableStreamAdvanceQueue(stream); }, - r => { - stream._error(r); - } + r => ErrorWritableStream(stream, r) ) .catch(e => process.nextTick(() => { throw e; })); // to catch assertion failures }