From 632b26a05f3106650b1ec91239ad5b012e6c64af Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Fri, 21 Aug 2015 16:24:13 -0400 Subject: [PATCH] Make reader closed and read() reject after release This significantly simplifies the internals of readers, and their interaction with their owner stream. --- index.bs | 49 +++--------- .../lib/readable-stream.js | 79 +++++++------------ .../test/readable-stream-reader.js | 6 +- .../readable-stream-closed-reader.js | 14 ++++ .../templated/readable-stream-empty-reader.js | 21 ++--- .../readable-stream-errored-reader.js | 21 +++++ ...eadable-stream-two-chunks-closed-reader.js | 11 +-- 7 files changed, 94 insertions(+), 107 deletions(-) diff --git a/index.bs b/index.bs index f4aee142b..9f36bcf3e 100644 --- a/index.bs +++ b/index.bs @@ -750,16 +750,6 @@ Instances of ReadableStreamReader are created with the internal slo A List of promises returned by calls to the reader's read() method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available - - \[[state]] - A string containing the reader's current state, used internally; one of "readable", - "closed", or "errored" - - - \[[storedError]] - A value indicating how the reader's stream failed, to be given as a failure reason or exception when trying to - operate on the reader; applicable only when \[[state]] is "errored" -

new ReadableStreamReader(stream)

@@ -773,19 +763,15 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStream(_stream_) is *false*, throw a *TypeError* exception. 1. If IsReadableStreamLocked(_stream_) is *true*, throw a *TypeError* exception. - 1. Set *this*@[[state]] to _stream_@[[state]]. 1. Set *this*@[[readRequests]] to a new empty List. 1. Set *this*@[[ownerReadableStream]] to _stream_. 1. Set _stream_@[[reader]] to *this*. 1. If _stream_@[[state]] is "readable", - 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise. 1. Otherwise, if _stream_@[[state]] is "closed", - 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise resolved with *undefined*. 1. Otherwise, 1. Assert: _stream_@[[state]] is "errored". - 1. Set *this*@[[storedError]] to _stream_@[[storedError]]. 1. Set *this*@[[closedPromise]] to a new promise rejected with _stream_@[[storedError]]. @@ -812,11 +798,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. - 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. - 1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*. - 1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]]. - 1. Assert: *this*@[[ownerReadableStream]] is not *undefined*. - 1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable". + 1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return CancelReadableStream(*this*@[[ownerReadableStream]], _reason_). @@ -839,6 +821,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. + 1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return ReadFromReadableStreamReader(*this*). @@ -859,7 +842,8 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, throw a *TypeError* exception. 1. If *this*@[[ownerReadableStream]] is *undefined*, return *undefined*. 1. If *this*@[[readRequests]] is not empty, throw a *TypeError* exception. - 1. If *this*@[[state]] is `"readable"`, perform CloseReadableStreamReader(*this*). + 1. If *this*@[[ownerReadableStream]]@[[state]] is `"readable"`, reject *this*@[[closedPromise]] with a *TypeError* exception. + 1. Otherwise, set *this*@[[closedPromise]] be a new promise rejected with a *TypeError* exception. 1. Set *this*@[[ownerReadableStream]]@[[reader]] to *undefined*. 1. Set *this*@[[ownerReadableStream]] to *undefined*. 1. Return *undefined*. @@ -911,17 +895,6 @@ this to streams they did not create, and must ensure they have obeyed the precon control of the underlying source. -

CloseReadableStreamReader ( reader )

- - - 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], - 1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*). - 1. Set _reader_@[[readRequests]] to an empty List. - 1. Set _reader_@[[state]] to "closed". - 1. Resolve _reader_@[[closedPromise]] with *undefined*. - 1. Return *undefined*. - -

EnqueueInReadableStream ( stream, chunk )

This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, @@ -978,8 +951,6 @@ an assert). 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], 1. Reject _readRequestPromise_ with _e_. 1. Set _reader_@[[readRequests]] to a new empty List. - 1. Set _reader_@[[storedError]] to _e_. - 1. Set _reader_@[[state]] to "errored". 1. Reject _reader_@[[closedPromise]] with _e_. 1. Return *undefined*. @@ -990,7 +961,11 @@ an assert). 1. Assert: _stream_@[[state]] is "readable". 1. Set _stream_@[[state]] to "closed". 1. Let _reader_ be _stream_@[[reader]]. - 1. If _reader_ is not *undefined*, perform CloseReadableStreamReader(_reader_). + 1. If _reader_ is *undefined*, return *undefined*. + 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], + 1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*). + 1. Set _reader_@[[readRequests]] to an empty List. + 1. Resolve _reader_@[[closedPromise]] with *undefined*. 1. Return *undefined*. @@ -1049,10 +1024,10 @@ readable stream is locked to a reader.

ReadFromReadableStreamReader ( reader )

- 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. - 1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). - 1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. + 1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*. + 1. If _reader_@[[ownerReadableStream]]@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). + 1. If _reader_@[[ownerReadableStream]]@[[state]] is "errored", return a new promise rejected with _reader_@[[ownerReadableStream]]@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable". 1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 7b6186676..b81771116 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -257,27 +257,19 @@ class ReadableStreamReader { this._ownerReadableStream = stream; stream._reader = this; - this._state = stream._state; this._readRequests = []; if (stream._state === 'readable') { - this._storedError = undefined; - this._closedPromise = new Promise((resolve, reject) => { this._closedPromise_resolve = resolve; this._closedPromise_reject = reject; }); } else if (stream._state === 'closed') { - this._storedError = undefined; - this._closedPromise = Promise.resolve(undefined); this._closedPromise_resolve = undefined; this._closedPromise_reject = undefined; } else { assert(stream._state === 'errored'); - - this._storedError = stream._storedError; - this._closedPromise = Promise.reject(stream._storedError); this._closedPromise_resolve = undefined; this._closedPromise_reject = undefined; @@ -299,21 +291,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); } - if (this._ownerReadableStream !== undefined) { - this._ownerReadableStream._disturbed = true; - } - - if (this._state === 'closed') { - return Promise.resolve(undefined); - } - - if (this._state === 'errored') { - return Promise.reject(this._storedError); + if (this._ownerReadableStream === undefined) { + return Promise.reject(new TypeError('Cannot cancel a stream using a released reader')); } - assert(this._ownerReadableStream !== undefined); - assert(this._ownerReadableStream._state === 'readable'); - return CancelReadableStream(this._ownerReadableStream, reason); } @@ -323,6 +304,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); } + if (this._ownerReadableStream === undefined) { + return Promise.reject(new TypeError('Cannot read from a released reader')); + } + return ReadFromReadableStreamReader(this); } @@ -339,8 +324,12 @@ class ReadableStreamReader { throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); } - if (this._state === 'readable') { - CloseReadableStreamReader(this); + if (this._ownerReadableStream._state === 'readable') { + this._closedPromise_reject( + new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness')); + } else { + this._closedPromise = Promise.reject( + new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness')); } this._ownerReadableStream._reader = undefined; @@ -389,21 +378,6 @@ function CloseReadableStream(stream) { } } -function CloseReadableStreamReader(reader) { - for (const { _resolve } of reader._readRequests) { - _resolve(CreateIterResultObject(undefined, true)); - } - reader._readRequests = []; - - reader._state = 'closed'; - - reader._closedPromise_resolve(undefined); - reader._closedPromise_resolve = undefined; - reader._closedPromise_reject = undefined; - - return undefined; -} - function EnqueueInReadableStream(stream, chunk) { assert(stream._closeRequested === false); assert(stream._state !== 'errored'); @@ -460,9 +434,6 @@ function ErrorReadableStream(stream, e) { } reader._readRequests = []; - reader._storedError = e; - reader._state = 'errored'; - reader._closedPromise_reject(e); reader._closedPromise_resolve = undefined; reader._closedPromise_reject = undefined; @@ -477,10 +448,19 @@ function FinishClosingReadableStream(stream) { const reader = stream._reader; - if (reader !== undefined) { - CloseReadableStreamReader(reader); + if (reader === undefined) { + return undefined; } + for (const { _resolve } of reader._readRequests) { + _resolve(CreateIterResultObject(undefined, true)); + } + reader._readRequests = []; + + reader._closedPromise_resolve(undefined); + reader._closedPromise_resolve = undefined; + reader._closedPromise_reject = undefined; + return undefined; } @@ -542,19 +522,18 @@ function IsReadableStreamReader(x) { } function ReadFromReadableStreamReader(reader) { - if (reader._ownerReadableStream !== undefined) { - reader._ownerReadableStream._disturbed = true; - } + assert(reader._ownerReadableStream !== undefined); - if (reader._state === 'closed') { + reader._ownerReadableStream._disturbed = true; + + if (reader._ownerReadableStream._state === 'closed') { return Promise.resolve(CreateIterResultObject(undefined, true)); } - if (reader._state === 'errored') { - return Promise.reject(reader._storedError); + if (reader._ownerReadableStream._state === 'errored') { + return Promise.reject(reader._ownerReadableStream._storedError); } - assert(reader._ownerReadableStream !== undefined); assert(reader._ownerReadableStream._state === 'readable'); if (reader._ownerReadableStream._queue.length > 0) { diff --git a/reference-implementation/test/readable-stream-reader.js b/reference-implementation/test/readable-stream-reader.js index c10b2c427..0881339e5 100644 --- a/reference-implementation/test/readable-stream-reader.js +++ b/reference-implementation/test/readable-stream-reader.js @@ -173,7 +173,7 @@ test('closed should be fulfilled after stream is closed (.closed access before a controller.close(); }); -test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => { +test('closed should be rejected after reader releases its lock (multiple stream locks)', t => { t.plan(2); let controller; @@ -190,8 +190,8 @@ test('closed should be fulfilled after reader releases its lock (multiple stream const reader2 = rs.getReader(); controller.close(); - reader1.closed.then(() => { - t.pass('reader1 closed should be fulfilled'); + reader1.closed.catch(e => { + t.equal(e.constructor, TypeError, 'reader1 closed should be rejected with a TypeError'); }); reader2.closed.then(() => { diff --git a/reference-implementation/test/templated/readable-stream-closed-reader.js b/reference-implementation/test/templated/readable-stream-closed-reader.js index 8af8518ee..178869ae5 100644 --- a/reference-implementation/test/templated/readable-stream-closed-reader.js +++ b/reference-implementation/test/templated/readable-stream-closed-reader.js @@ -46,6 +46,20 @@ export default (label, factory) => { ); }); + test('releasing the lock should cause closed to reject and change identity', t => { + t.plan(3); + const { reader } = factory(); + + const closedBefore = reader.closed; + reader.releaseLock(); + const closedAfter = reader.closed; + + t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity') + closedBefore.then(v => t.equal(v, undefined, 'reader.closed acquired before release should fulfill')); + closedAfter.catch( + e => t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError')); + }); + test('cancel() should return a distinct fulfilled promise each time', t => { t.plan(5); const { reader } = factory(); diff --git a/reference-implementation/test/templated/readable-stream-empty-reader.js b/reference-implementation/test/templated/readable-stream-empty-reader.js index 9fb1d6ac2..082212831 100644 --- a/reference-implementation/test/templated/readable-stream-empty-reader.js +++ b/reference-implementation/test/templated/readable-stream-empty-reader.js @@ -67,7 +67,7 @@ export default (label, factory) => { }); test('releasing the lock with pending read requests should throw but the read requests should stay pending', t => { - const { reader } = factory(); + const { stream, reader } = factory(); reader.read().then( () => t.fail('first read() should not fulfill'), @@ -86,30 +86,31 @@ export default (label, factory) => { t.throws(() => reader.releaseLock(), /TypeError/, 'releaseLock should throw a TypeError'); + t.equal(stream.locked, true, 'the stream should still be locked'); + setTimeout(() => t.end(), 50); }); - test('releasing the lock should cause further read() calls to resolve as if the stream is closed', t => { + test('releasing the lock should cause further read() calls to reject with a TypeError', t => { t.plan(2); const { reader } = factory(); reader.releaseLock(); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'first read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError')); }); - test('releasing the lock should cause closed to fulfill', t => { + test('releasing the lock should cause closed to reject', t => { t.plan(2); const { reader } = factory(); - reader.closed.then(v => t.equal(v, undefined, 'reader.closed got before release should fulfill with undefined')); - + const closedBefore = reader.closed; reader.releaseLock(); + const closedAfter = reader.closed; - reader.closed.then(v => t.equal(v, undefined, 'reader.closed got after release should fulfill with undefined')); + t.equal(closedBefore, closedAfter, 'the closed promise should not change identity') + closedBefore.catch(e => t.equal(e.constructor, TypeError, 'reader.closed should reject with a TypeError')); }); test('releasing the lock should cause locked to become false', t => { diff --git a/reference-implementation/test/templated/readable-stream-errored-reader.js b/reference-implementation/test/templated/readable-stream-errored-reader.js index 1b81ff8a9..7be5d0c88 100644 --- a/reference-implementation/test/templated/readable-stream-errored-reader.js +++ b/reference-implementation/test/templated/readable-stream-errored-reader.js @@ -15,6 +15,27 @@ export default (label, factory, error) => { ); }); + test('releasing the lock should cause closed to reject and change identity', t => { + t.plan(3); + const { reader } = factory(); + + const closedBefore = reader.closed; + + closedBefore.catch(e => { + t.equal(e, error, 'reader.closed acquired before release should reject with the error'); + + reader.releaseLock(); + const closedAfter = reader.closed; + + t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity'); + + return closedAfter.catch(e => { + t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError'); + }); + }) + .catch(e => t.error(e)); + }); + test('read() should reject with the error', t => { t.plan(1); const { reader } = factory(); diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js index c5d075fc8..4ea6138f1 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js @@ -67,18 +67,15 @@ export default (label, factory, chunks) => { reader.read(); }); - test('releasing the lock should cause read() to act as if the stream is closed', t => { + test('releasing the lock should cause further read() calls to reject with a TypeError', t => { t.plan(3); const { reader } = factory(); reader.releaseLock(); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'third read() should return closed result')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'first read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'third read() should reject with a TypeError')); }); test('reader\'s closed property always returns the same promise', t => {