From ddcf083b4822e7de55dc32df399ce77a136423bd Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Tue, 18 Aug 2015 15:05:16 -0400 Subject: [PATCH 1/4] Remove automatic unlocking on close/error Per #385, the current auto-unlocking design interferes with the implementation of the disturbed flag for fetch. It was never that useful anyway, so removing it is probably the best solution. --- index.bs | 20 +++--------- .../lib/readable-stream.js | 31 +++++++------------ .../test/readable-stream-reader.js | 19 +++++++----- .../test/templated/readable-stream-closed.js | 22 +++++++++++-- .../readable-stream-errored-sync-only.js | 24 +++++++------- ...eadable-stream-two-chunks-closed-reader.js | 18 ++++++----- 6 files changed, 70 insertions(+), 64 deletions(-) diff --git a/index.bs b/index.bs index 07a9adba5..f58835486 100644 --- a/index.bs +++ b/index.bs @@ -770,18 +770,16 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamLocked(_stream_) is *true*, throw a *TypeError* exception. 1. Set *this*@[[state]] to _stream_@[[state]]. 1. Set *this*@[[readRequests]] to a new empty List. + 1. Set *this*@[[ownerReadableStream]] to _stream_. + 1. Set _stream_@[[reader]] to *this*. 1. If _stream_@[[state]] is "readable", - 1. Set *this*@[[ownerReadableStream]] to _stream_. 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise. - 1. Set _stream_@[[reader]] to *this*. 1. Otherwise, if _stream_@[[state]] is "closed", - 1. Set *this*@[[ownerReadableStream]] to *undefined*. 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise resolved with *undefined*. 1. Otherwise, 1. Assert: _stream_@[[state]] is "errored". - 1. Set *this*@[[ownerReadableStream]] to *undefined*. 1. Set *this*@[[storedError]] to _stream_@[[storedError]]. 1. Set *this*@[[closedPromise]] to a new promise rejected with _stream_@[[storedError]]. @@ -855,7 +853,9 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, throw a *TypeError* exception. 1. If *this*@[[ownerReadableStream]] is *undefined*, return *undefined*. 1. If *this*@[[readRequests]] is not empty, throw a *TypeError* exception. - 1. Perform CloseReadableStreamReader(*this*). + 1. If *this*@[[state]] is `"readable"`, perform CloseReadableStreamReader(*this*). + 1. Set *this*@[[ownerReadableStream]]@[[reader]] to *undefined*. + 1. Set *this*@[[ownerReadableStream]] to *undefined*. 1. Return *undefined*. @@ -910,7 +910,6 @@ this to streams they did not create, and must ensure they have obeyed the precon 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], 1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*). 1. Set _reader_@[[readRequests]] to an empty List. - 1. Perform ReleaseReadableStreamReader(_reader_). 1. Set _reader_@[[state]] to "closed". 1. Resolve _reader_@[[closedPromise]] with *undefined*. 1. Return *undefined*. @@ -972,7 +971,6 @@ an assert). 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], 1. Reject _readRequestPromise_ with _e_. 1. Set _reader_@[[readRequests]] to a new empty List. - 1. Perform ReleaseReadableStreamReader(_reader_). 1. Set _reader_@[[storedError]] to _e_. 1. Set _reader_@[[state]] to "errored". 1. Reject _reader_@[[closedPromise]] with _e_. @@ -1050,14 +1048,6 @@ readable stream is locked to a reader. 1. Return _readRequestPromise_. -

ReleaseReadableStreamReader ( reader )

- - - 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. - 1. Set _reader_@[[ownerReadableStream]]@[[reader]] to *undefined*. - 1. Set _reader_@[[ownerReadableStream]] to *undefined*. - -

RequestReadableStreamPull ( stream )

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index f550f02bf..b28b4c41d 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -112,7 +112,8 @@ export default class ReadableStream { lastWrite = dest.write(value); doPipe(); } - }); + }) + .catch(rethrowAssertionErrorRejection); // Any failures will be handled by listening to reader.closed and dest.closed above. // TODO: handle malicious dest.write/dest.close? @@ -120,8 +121,8 @@ export default class ReadableStream { function cancelSource(reason) { if (preventCancel === false) { - // cancelling automatically releases the lock (and that doesn't fail, since source is then closed) reader.cancel(reason); + reader.releaseLock(); rejectPipeToPromise(reason); } else { // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. @@ -251,21 +252,20 @@ class ReadableStreamReader { throw new TypeError('This stream has already been locked for exclusive reading by another reader'); } + this._ownerReadableStream = stream; + stream._reader = this; + this._state = stream._state; this._readRequests = []; if (stream._state === 'readable') { - this._ownerReadableStream = stream; this._storedError = undefined; this._closedPromise = new Promise((resolve, reject) => { this._closedPromise_resolve = resolve; this._closedPromise_reject = reject; }); - - stream._reader = this; } else if (stream._state === 'closed') { - this._ownerReadableStream = undefined; this._storedError = undefined; this._closedPromise = Promise.resolve(undefined); @@ -274,7 +274,6 @@ class ReadableStreamReader { } else { assert(stream._state === 'errored'); - this._ownerReadableStream = undefined; this._storedError = stream._storedError; this._closedPromise = Promise.reject(stream._storedError); @@ -334,7 +333,12 @@ class ReadableStreamReader { throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); } - CloseReadableStreamReader(this); + if (this._state === 'readable') { + CloseReadableStreamReader(this); + } + + this._ownerReadableStream._reader = undefined; + this._ownerReadableStream = undefined; return undefined; } @@ -383,8 +387,6 @@ function CloseReadableStreamReader(reader) { } reader._readRequests = []; - ReleaseReadableStreamReader(reader); - reader._state = 'closed'; reader._closedPromise_resolve(undefined); @@ -450,8 +452,6 @@ function ErrorReadableStream(stream, e) { } reader._readRequests = []; - ReleaseReadableStreamReader(reader); - reader._storedError = e; reader._state = 'errored'; @@ -562,13 +562,6 @@ function ReadFromReadableStreamReader(reader) { } } -function ReleaseReadableStreamReader(reader) { - assert(reader._ownerReadableStream !== undefined); - - reader._ownerReadableStream._reader = undefined; - reader._ownerReadableStream = undefined; -} - function RequestReadableStreamPull(stream) { const shouldPull = ShouldReadableStreamPull(stream); if (shouldPull === false) { diff --git a/reference-implementation/test/readable-stream-reader.js b/reference-implementation/test/readable-stream-reader.js index e337eee8f..c10b2c427 100644 --- a/reference-implementation/test/readable-stream-reader.js +++ b/reference-implementation/test/readable-stream-reader.js @@ -136,13 +136,14 @@ test('Reading from a reader for an empty stream will wait until a chunk is avail controller.enqueue('a'); }); -test('cancel() on a reader releases the reader before calling through', t => { - t.plan(3); +test('cancel() on a reader does not release the reader', t => { + t.plan(4); const passedReason = new Error('it wasn\'t the right time, sorry'); const rs = new ReadableStream({ cancel(reason) { - t.doesNotThrow(() => rs.getReader(), 'should be able to get another reader without error'); + t.equal(rs.locked, true, 'the stream should still be locked'); + t.throws(() => rs.getReader(), /TypeError/, 'should not be able to get another reader'); t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source'); } }); @@ -259,7 +260,7 @@ test('cancel() on a released reader is a no-op and does not pass through', t => setTimeout(() => t.end(), 50); }); -test('Getting a second reader after erroring the stream should succeed', t => { +test('Getting a second reader after erroring the stream and releasing the reader should succeed', t => { t.plan(5); let controller; @@ -284,12 +285,16 @@ test('Getting a second reader after erroring the stream should succeed', t => { controller.error(theError); - rs.getReader().closed.catch(e => { + reader1.releaseLock(); + + const reader2 = rs.getReader(); + + reader2.closed.catch(e => { t.equal(e, theError, 'the second reader closed getter should be rejected with the error'); }); - rs.getReader().read().catch(e => { - t.equal(e, theError, 'the third reader read() should be rejected with the error'); + reader2.read().catch(e => { + t.equal(e, theError, 'the second reader read() should be rejected with the error'); }); }); diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js index 70b2f81bb..66d01edaf 100644 --- a/reference-implementation/test/templated/readable-stream-closed.js +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -93,13 +93,29 @@ export default (label, factory) => { .catch(e => t.error(e)); }); - test('should be able to acquire multiple readers, since they are all auto-released', t => { + test('should be able to acquire multiple readers if they are released in succession', t => { const rs = factory(); - rs.getReader(); + const reader = rs.getReader(); + reader.releaseLock(); + + t.doesNotThrow(() => { + const reader = rs.getReader(); + reader.releaseLock(); + }, 'getting a second reader should not throw'); - t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw'); t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw'); + + t.end(); + }); + + test('should not be able to acquire a second reader if we don\'t release the first one', t => { + const rs = factory(); + + rs.getReader(); + + t.throws(() => rs.getReader(), /TypeError/, 'getting a second reader should throw'); + t.throws(() => rs.getReader(), /TypeError/, 'getting a third reader should throw'); t.end(); }); }; diff --git a/reference-implementation/test/templated/readable-stream-errored-sync-only.js b/reference-implementation/test/templated/readable-stream-errored-sync-only.js index c7e16fff7..dfb4e4790 100644 --- a/reference-implementation/test/templated/readable-stream-errored-sync-only.js +++ b/reference-implementation/test/templated/readable-stream-errored-sync-only.js @@ -5,11 +5,11 @@ export default (label, factory, error) => { tapeTest(`${label}: ${description}`, testFn); } - test('should be able to obtain a second reader, with the correct closed promise', t => { + test('should be able to obtain a second reader after releasing the first, with the correct closed promise', t => { t.plan(2); const rs = factory(); - rs.getReader(); + rs.getReader().releaseLock(); let reader; t.doesNotThrow(() => reader = rs.getReader(), @@ -21,6 +21,16 @@ export default (label, factory, error) => { ); }); + test('should not be able to obtain additional readers if we don\'t release the first lock', t => { + t.plan(2); + const rs = factory(); + + rs.getReader(); + + t.throws(() => rs.getReader(), /TypeError/, 'getting a second reader should throw a TypeError'); + t.throws(() => rs.getReader(), /TypeError/, 'getting a third reader should throw a TypeError'); + }); + test('cancel() should return a distinct rejected promise each time', t => { t.plan(3); const rs = factory(); @@ -45,14 +55,4 @@ export default (label, factory, error) => { cancelPromise2.catch(e => t.equal(e, error, 'second cancel() call should reject with the error')); t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises'); }); - - test('should be able to acquire multiple readers, since they are all auto-released', t => { - const rs = factory(); - - rs.getReader(); - - t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw'); - t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw'); - t.end(); - }); }; diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js index 906212069..c5d075fc8 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js @@ -34,7 +34,7 @@ export default (label, factory, chunks) => { .catch(e => t.error(e)); }); - test('draining the stream via read() should cause the reader closed promise to fulfill and locked to be false', t => { + test('draining the stream via read() should cause the reader closed promise to fulfill, but locked stays true', t => { t.plan(3); const { stream, reader } = factory(); @@ -44,7 +44,7 @@ export default (label, factory, chunks) => { reader.closed.then( v => { t.equal(v, undefined, 'reader closed should fulfill with undefined'); - t.equal(stream.locked, false, 'stream should no longer be locked'); + t.equal(stream.locked, true, 'stream should remain locked'); }, () => t.fail('reader closed should not reject') ); @@ -53,13 +53,15 @@ export default (label, factory, chunks) => { reader.read(); }); - test('releasing the lock after the stream is closed should do nothing', t => { - t.plan(1); - const { reader } = factory(); + test('releasing the lock after the stream is closed should cause locked to become false', t => { + t.plan(3); + const { stream, reader } = factory(); - reader.closed.then( - () => t.doesNotThrow(() => reader.releaseLock(), 'releasing the lock after reader closed should not throw') - ); + reader.closed.then(() => { + t.equal(stream.locked, true, 'the stream should start locked'); + t.doesNotThrow(() => reader.releaseLock(), 'releasing the lock after reader closed should not throw'); + t.equal(stream.locked, false, 'the stream should end unlocked'); + }); reader.read(); reader.read(); From 6f4bb19dfdecae76007af9e5aa29669fd46d4f93 Mon Sep 17 00:00:00 2001 From: Takeshi Yoshino Date: Mon, 3 Aug 2015 18:31:32 +0900 Subject: [PATCH 2/4] Add IsReadableStreamDisturbed predicate Closes #378. --- index.bs | 17 ++++++++++ .../lib/readable-stream.js | 12 +++++++ reference-implementation/test/abstract-ops.js | 33 +++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 reference-implementation/test/abstract-ops.js diff --git a/index.bs b/index.bs index f58835486..551f24a9d 100644 --- a/index.bs +++ b/index.bs @@ -310,6 +310,10 @@ Instances of ReadableStream are created with the internal slots des A ReadableStreamController created with the ability to control the state and queue of this stream + + \[[disturbed]] + A boolean flag set to true when the stream has been read from or canceled + \[[pullAgain]] A boolean flag set to true if the stream's mechanisms requested a call to the underlying source's @@ -398,6 +402,7 @@ Instances of ReadableStream are created with the internal slots des 1. Set *this*@[[state]] to "readable". 1. Set *this*@[[started]], *this*@[[closeRequested]], *this*@[[pullAgain]], and *this*@[[pulling]] to *false*. 1. Set *this*@[[reader]] and *this*@[[storedError]] to *undefined*. + 1. Set *this*@[[disturbed]] to *false*. 1. Set *this*@[[controller]] to Construct(`ReadableStreamController`, «*this*»). 1. Let _normalizedStrategy_ be ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_). 1. Set *this*@[[strategySize]] to _normalizedStrategy_.[[size]] and *this*@[[strategyHWM]] to _normalizedStrategy_.[[highWaterMark]]. @@ -875,6 +880,7 @@ reader for a given stream. 1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]]. + 1. Set _stream_@[[disturbed]] to *true*. 1. Set _stream_@[[queue]] to a new empty List. 1. Perform FinishClosingReadableStream(_stream_). 1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»). @@ -1010,6 +1016,16 @@ an assert). 1. Return *true*. +

IsReadableStreamDisturbed ( stream )

+ +This abstract operation is meant to be called from other specifications that may wish to query whether or not a +readable stream has ever been read from or canceled. + + + 1. Assert: IsReadableStream(_stream_) is *true*. + 1. Return _stream_@[[disturbed]]. + +

IsReadableStreamLocked ( stream )

This abstract operation is meant to be called from other specifications that may wish to query whether or not a @@ -1036,6 +1052,7 @@ readable stream is locked to a reader. 1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. 1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable". + 1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]). 1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index b28b4c41d..df32b70da 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -16,6 +16,8 @@ export default class ReadableStream { this._reader = undefined; this._storedError = undefined; + this._disturbed = false; + const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); this._strategySize = normalizedStrategy.size; this._strategyHWM = normalizedStrategy.highWaterMark; @@ -357,6 +359,8 @@ function CancelReadableStream(stream, reason) { return Promise.reject(stream._storedError); } + stream._disturbed = true; + stream._queue = []; FinishClosingReadableStream(stream); @@ -493,6 +497,12 @@ function IsReadableStream(x) { return true; } +export function IsReadableStreamDisturbed(stream) { + assert(IsReadableStream(stream) === true, 'IsReadableStreamDisturbed should only be used on known readable streams'); + + return stream._disturbed; +} + function IsReadableStreamLocked(stream) { assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams'); @@ -539,6 +549,8 @@ function ReadFromReadableStreamReader(reader) { assert(reader._ownerReadableStream !== undefined); assert(reader._ownerReadableStream._state === 'readable'); + reader._ownerReadableStream._disturbed = true; + if (reader._ownerReadableStream._queue.length > 0) { const chunk = DequeueValue(reader._ownerReadableStream._queue); diff --git a/reference-implementation/test/abstract-ops.js b/reference-implementation/test/abstract-ops.js new file mode 100644 index 000000000..3070a3d20 --- /dev/null +++ b/reference-implementation/test/abstract-ops.js @@ -0,0 +1,33 @@ +const test = require('tape-catch'); + +import { IsReadableStreamDisturbed } from '../lib/readable-stream' + +test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => { + const rs = new ReadableStream(); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.read(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => { + const rs = new ReadableStream(); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.cancel(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call'); + + t.end(); +}); From bd839e5c7527ebf7800842f2d9a3bc7e629f09d1 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Tue, 18 Aug 2015 17:57:54 -0400 Subject: [PATCH 3/4] Update IsReadableStreamDisturbed to work after close or error --- index.bs | 5 +- .../lib/readable-stream.js | 14 +++- reference-implementation/test/abstract-ops.js | 80 ++++++++++++++++++- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/index.bs b/index.bs index 551f24a9d..f4aee142b 100644 --- a/index.bs +++ b/index.bs @@ -812,6 +812,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. + 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]]. 1. Assert: *this*@[[ownerReadableStream]] is not *undefined*. @@ -878,9 +879,9 @@ reader for a given stream.

CancelReadableStream ( stream, reason )

+ 1. Set _stream_@[[disturbed]] to *true*. 1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]]. - 1. Set _stream_@[[disturbed]] to *true*. 1. Set _stream_@[[queue]] to a new empty List. 1. Perform FinishClosingReadableStream(_stream_). 1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»). @@ -1048,11 +1049,11 @@ readable stream is locked to a reader.

ReadFromReadableStreamReader ( reader )

+ 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). 1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. 1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable". - 1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]). 1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index df32b70da..7b6186676 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -299,6 +299,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); } + if (this._ownerReadableStream !== undefined) { + this._ownerReadableStream._disturbed = true; + } + if (this._state === 'closed') { return Promise.resolve(undefined); } @@ -352,6 +356,8 @@ function AcquireReadableStreamReader(stream) { } function CancelReadableStream(stream, reason) { + stream._disturbed = true; + if (stream._state === 'closed') { return Promise.resolve(undefined); } @@ -359,8 +365,6 @@ function CancelReadableStream(stream, reason) { return Promise.reject(stream._storedError); } - stream._disturbed = true; - stream._queue = []; FinishClosingReadableStream(stream); @@ -538,6 +542,10 @@ function IsReadableStreamReader(x) { } function ReadFromReadableStreamReader(reader) { + if (reader._ownerReadableStream !== undefined) { + reader._ownerReadableStream._disturbed = true; + } + if (reader._state === 'closed') { return Promise.resolve(CreateIterResultObject(undefined, true)); } @@ -549,8 +557,6 @@ function ReadFromReadableStreamReader(reader) { assert(reader._ownerReadableStream !== undefined); assert(reader._ownerReadableStream._state === 'readable'); - reader._ownerReadableStream._disturbed = true; - if (reader._ownerReadableStream._queue.length > 0) { const chunk = DequeueValue(reader._ownerReadableStream._queue); diff --git a/reference-implementation/test/abstract-ops.js b/reference-implementation/test/abstract-ops.js index 3070a3d20..d3d1504f3 100644 --- a/reference-implementation/test/abstract-ops.js +++ b/reference-implementation/test/abstract-ops.js @@ -2,7 +2,7 @@ const test = require('tape-catch'); import { IsReadableStreamDisturbed } from '../lib/readable-stream' -test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => { +test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which read() has been called', t => { const rs = new ReadableStream(); t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); @@ -17,7 +17,7 @@ test('IsReadableStreamDisturbed returns true for a stream on which read() has be t.end(); }); -test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => { +test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which cancel() has been called', t => { const rs = new ReadableStream(); t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); @@ -31,3 +31,79 @@ test('IsReadableStreamDisturbed returns true for a stream on which cancel() has t.end(); }); + +test('IsReadableStreamDisturbed returns true for a closed stream on which read() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.close(); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.read(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for a closed stream on which cancel() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.close(); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.cancel(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for an errored stream on which read() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.error(new Error('waffles')); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.read(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for an errored stream on which cancel() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.error(new Error('waffles')); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.cancel(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call'); + + t.end(); +}); From 632b26a05f3106650b1ec91239ad5b012e6c64af Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Fri, 21 Aug 2015 16:24:13 -0400 Subject: [PATCH 4/4] Make reader closed and read() reject after release This significantly simplifies the internals of readers, and their interaction with their owner stream. --- index.bs | 49 +++--------- .../lib/readable-stream.js | 79 +++++++------------ .../test/readable-stream-reader.js | 6 +- .../readable-stream-closed-reader.js | 14 ++++ .../templated/readable-stream-empty-reader.js | 21 ++--- .../readable-stream-errored-reader.js | 21 +++++ ...eadable-stream-two-chunks-closed-reader.js | 11 +-- 7 files changed, 94 insertions(+), 107 deletions(-) diff --git a/index.bs b/index.bs index f4aee142b..9f36bcf3e 100644 --- a/index.bs +++ b/index.bs @@ -750,16 +750,6 @@ Instances of ReadableStreamReader are created with the internal slo A List of promises returned by calls to the reader's read() method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available - - \[[state]] - A string containing the reader's current state, used internally; one of "readable", - "closed", or "errored" - - - \[[storedError]] - A value indicating how the reader's stream failed, to be given as a failure reason or exception when trying to - operate on the reader; applicable only when \[[state]] is "errored" -

new ReadableStreamReader(stream)

@@ -773,19 +763,15 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStream(_stream_) is *false*, throw a *TypeError* exception. 1. If IsReadableStreamLocked(_stream_) is *true*, throw a *TypeError* exception. - 1. Set *this*@[[state]] to _stream_@[[state]]. 1. Set *this*@[[readRequests]] to a new empty List. 1. Set *this*@[[ownerReadableStream]] to _stream_. 1. Set _stream_@[[reader]] to *this*. 1. If _stream_@[[state]] is "readable", - 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise. 1. Otherwise, if _stream_@[[state]] is "closed", - 1. Set *this*@[[storedError]] to *undefined*. 1. Set *this*@[[closedPromise]] to a new promise resolved with *undefined*. 1. Otherwise, 1. Assert: _stream_@[[state]] is "errored". - 1. Set *this*@[[storedError]] to _stream_@[[storedError]]. 1. Set *this*@[[closedPromise]] to a new promise rejected with _stream_@[[storedError]]. @@ -812,11 +798,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. - 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. - 1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*. - 1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]]. - 1. Assert: *this*@[[ownerReadableStream]] is not *undefined*. - 1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable". + 1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return CancelReadableStream(*this*@[[ownerReadableStream]], _reason_). @@ -839,6 +821,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. + 1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return ReadFromReadableStreamReader(*this*). @@ -859,7 +842,8 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, throw a *TypeError* exception. 1. If *this*@[[ownerReadableStream]] is *undefined*, return *undefined*. 1. If *this*@[[readRequests]] is not empty, throw a *TypeError* exception. - 1. If *this*@[[state]] is `"readable"`, perform CloseReadableStreamReader(*this*). + 1. If *this*@[[ownerReadableStream]]@[[state]] is `"readable"`, reject *this*@[[closedPromise]] with a *TypeError* exception. + 1. Otherwise, set *this*@[[closedPromise]] be a new promise rejected with a *TypeError* exception. 1. Set *this*@[[ownerReadableStream]]@[[reader]] to *undefined*. 1. Set *this*@[[ownerReadableStream]] to *undefined*. 1. Return *undefined*. @@ -911,17 +895,6 @@ this to streams they did not create, and must ensure they have obeyed the precon control of the underlying source. -

CloseReadableStreamReader ( reader )

- - - 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], - 1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*). - 1. Set _reader_@[[readRequests]] to an empty List. - 1. Set _reader_@[[state]] to "closed". - 1. Resolve _reader_@[[closedPromise]] with *undefined*. - 1. Return *undefined*. - -

EnqueueInReadableStream ( stream, chunk )

This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, @@ -978,8 +951,6 @@ an assert). 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], 1. Reject _readRequestPromise_ with _e_. 1. Set _reader_@[[readRequests]] to a new empty List. - 1. Set _reader_@[[storedError]] to _e_. - 1. Set _reader_@[[state]] to "errored". 1. Reject _reader_@[[closedPromise]] with _e_. 1. Return *undefined*.
@@ -990,7 +961,11 @@ an assert). 1. Assert: _stream_@[[state]] is "readable". 1. Set _stream_@[[state]] to "closed". 1. Let _reader_ be _stream_@[[reader]]. - 1. If _reader_ is not *undefined*, perform CloseReadableStreamReader(_reader_). + 1. If _reader_ is *undefined*, return *undefined*. + 1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]], + 1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*). + 1. Set _reader_@[[readRequests]] to an empty List. + 1. Resolve _reader_@[[closedPromise]] with *undefined*. 1. Return *undefined*.
@@ -1049,10 +1024,10 @@ readable stream is locked to a reader.

ReadFromReadableStreamReader ( reader )

- 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. - 1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). - 1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. + 1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*. + 1. If _reader_@[[ownerReadableStream]]@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). + 1. If _reader_@[[ownerReadableStream]]@[[state]] is "errored", return a new promise rejected with _reader_@[[ownerReadableStream]]@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable". 1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 7b6186676..b81771116 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -257,27 +257,19 @@ class ReadableStreamReader { this._ownerReadableStream = stream; stream._reader = this; - this._state = stream._state; this._readRequests = []; if (stream._state === 'readable') { - this._storedError = undefined; - this._closedPromise = new Promise((resolve, reject) => { this._closedPromise_resolve = resolve; this._closedPromise_reject = reject; }); } else if (stream._state === 'closed') { - this._storedError = undefined; - this._closedPromise = Promise.resolve(undefined); this._closedPromise_resolve = undefined; this._closedPromise_reject = undefined; } else { assert(stream._state === 'errored'); - - this._storedError = stream._storedError; - this._closedPromise = Promise.reject(stream._storedError); this._closedPromise_resolve = undefined; this._closedPromise_reject = undefined; @@ -299,21 +291,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); } - if (this._ownerReadableStream !== undefined) { - this._ownerReadableStream._disturbed = true; - } - - if (this._state === 'closed') { - return Promise.resolve(undefined); - } - - if (this._state === 'errored') { - return Promise.reject(this._storedError); + if (this._ownerReadableStream === undefined) { + return Promise.reject(new TypeError('Cannot cancel a stream using a released reader')); } - assert(this._ownerReadableStream !== undefined); - assert(this._ownerReadableStream._state === 'readable'); - return CancelReadableStream(this._ownerReadableStream, reason); } @@ -323,6 +304,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); } + if (this._ownerReadableStream === undefined) { + return Promise.reject(new TypeError('Cannot read from a released reader')); + } + return ReadFromReadableStreamReader(this); } @@ -339,8 +324,12 @@ class ReadableStreamReader { throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); } - if (this._state === 'readable') { - CloseReadableStreamReader(this); + if (this._ownerReadableStream._state === 'readable') { + this._closedPromise_reject( + new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness')); + } else { + this._closedPromise = Promise.reject( + new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness')); } this._ownerReadableStream._reader = undefined; @@ -389,21 +378,6 @@ function CloseReadableStream(stream) { } } -function CloseReadableStreamReader(reader) { - for (const { _resolve } of reader._readRequests) { - _resolve(CreateIterResultObject(undefined, true)); - } - reader._readRequests = []; - - reader._state = 'closed'; - - reader._closedPromise_resolve(undefined); - reader._closedPromise_resolve = undefined; - reader._closedPromise_reject = undefined; - - return undefined; -} - function EnqueueInReadableStream(stream, chunk) { assert(stream._closeRequested === false); assert(stream._state !== 'errored'); @@ -460,9 +434,6 @@ function ErrorReadableStream(stream, e) { } reader._readRequests = []; - reader._storedError = e; - reader._state = 'errored'; - reader._closedPromise_reject(e); reader._closedPromise_resolve = undefined; reader._closedPromise_reject = undefined; @@ -477,10 +448,19 @@ function FinishClosingReadableStream(stream) { const reader = stream._reader; - if (reader !== undefined) { - CloseReadableStreamReader(reader); + if (reader === undefined) { + return undefined; } + for (const { _resolve } of reader._readRequests) { + _resolve(CreateIterResultObject(undefined, true)); + } + reader._readRequests = []; + + reader._closedPromise_resolve(undefined); + reader._closedPromise_resolve = undefined; + reader._closedPromise_reject = undefined; + return undefined; } @@ -542,19 +522,18 @@ function IsReadableStreamReader(x) { } function ReadFromReadableStreamReader(reader) { - if (reader._ownerReadableStream !== undefined) { - reader._ownerReadableStream._disturbed = true; - } + assert(reader._ownerReadableStream !== undefined); - if (reader._state === 'closed') { + reader._ownerReadableStream._disturbed = true; + + if (reader._ownerReadableStream._state === 'closed') { return Promise.resolve(CreateIterResultObject(undefined, true)); } - if (reader._state === 'errored') { - return Promise.reject(reader._storedError); + if (reader._ownerReadableStream._state === 'errored') { + return Promise.reject(reader._ownerReadableStream._storedError); } - assert(reader._ownerReadableStream !== undefined); assert(reader._ownerReadableStream._state === 'readable'); if (reader._ownerReadableStream._queue.length > 0) { diff --git a/reference-implementation/test/readable-stream-reader.js b/reference-implementation/test/readable-stream-reader.js index c10b2c427..0881339e5 100644 --- a/reference-implementation/test/readable-stream-reader.js +++ b/reference-implementation/test/readable-stream-reader.js @@ -173,7 +173,7 @@ test('closed should be fulfilled after stream is closed (.closed access before a controller.close(); }); -test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => { +test('closed should be rejected after reader releases its lock (multiple stream locks)', t => { t.plan(2); let controller; @@ -190,8 +190,8 @@ test('closed should be fulfilled after reader releases its lock (multiple stream const reader2 = rs.getReader(); controller.close(); - reader1.closed.then(() => { - t.pass('reader1 closed should be fulfilled'); + reader1.closed.catch(e => { + t.equal(e.constructor, TypeError, 'reader1 closed should be rejected with a TypeError'); }); reader2.closed.then(() => { diff --git a/reference-implementation/test/templated/readable-stream-closed-reader.js b/reference-implementation/test/templated/readable-stream-closed-reader.js index 8af8518ee..178869ae5 100644 --- a/reference-implementation/test/templated/readable-stream-closed-reader.js +++ b/reference-implementation/test/templated/readable-stream-closed-reader.js @@ -46,6 +46,20 @@ export default (label, factory) => { ); }); + test('releasing the lock should cause closed to reject and change identity', t => { + t.plan(3); + const { reader } = factory(); + + const closedBefore = reader.closed; + reader.releaseLock(); + const closedAfter = reader.closed; + + t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity') + closedBefore.then(v => t.equal(v, undefined, 'reader.closed acquired before release should fulfill')); + closedAfter.catch( + e => t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError')); + }); + test('cancel() should return a distinct fulfilled promise each time', t => { t.plan(5); const { reader } = factory(); diff --git a/reference-implementation/test/templated/readable-stream-empty-reader.js b/reference-implementation/test/templated/readable-stream-empty-reader.js index 9fb1d6ac2..082212831 100644 --- a/reference-implementation/test/templated/readable-stream-empty-reader.js +++ b/reference-implementation/test/templated/readable-stream-empty-reader.js @@ -67,7 +67,7 @@ export default (label, factory) => { }); test('releasing the lock with pending read requests should throw but the read requests should stay pending', t => { - const { reader } = factory(); + const { stream, reader } = factory(); reader.read().then( () => t.fail('first read() should not fulfill'), @@ -86,30 +86,31 @@ export default (label, factory) => { t.throws(() => reader.releaseLock(), /TypeError/, 'releaseLock should throw a TypeError'); + t.equal(stream.locked, true, 'the stream should still be locked'); + setTimeout(() => t.end(), 50); }); - test('releasing the lock should cause further read() calls to resolve as if the stream is closed', t => { + test('releasing the lock should cause further read() calls to reject with a TypeError', t => { t.plan(2); const { reader } = factory(); reader.releaseLock(); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'first read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError')); }); - test('releasing the lock should cause closed to fulfill', t => { + test('releasing the lock should cause closed to reject', t => { t.plan(2); const { reader } = factory(); - reader.closed.then(v => t.equal(v, undefined, 'reader.closed got before release should fulfill with undefined')); - + const closedBefore = reader.closed; reader.releaseLock(); + const closedAfter = reader.closed; - reader.closed.then(v => t.equal(v, undefined, 'reader.closed got after release should fulfill with undefined')); + t.equal(closedBefore, closedAfter, 'the closed promise should not change identity') + closedBefore.catch(e => t.equal(e.constructor, TypeError, 'reader.closed should reject with a TypeError')); }); test('releasing the lock should cause locked to become false', t => { diff --git a/reference-implementation/test/templated/readable-stream-errored-reader.js b/reference-implementation/test/templated/readable-stream-errored-reader.js index 1b81ff8a9..7be5d0c88 100644 --- a/reference-implementation/test/templated/readable-stream-errored-reader.js +++ b/reference-implementation/test/templated/readable-stream-errored-reader.js @@ -15,6 +15,27 @@ export default (label, factory, error) => { ); }); + test('releasing the lock should cause closed to reject and change identity', t => { + t.plan(3); + const { reader } = factory(); + + const closedBefore = reader.closed; + + closedBefore.catch(e => { + t.equal(e, error, 'reader.closed acquired before release should reject with the error'); + + reader.releaseLock(); + const closedAfter = reader.closed; + + t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity'); + + return closedAfter.catch(e => { + t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError'); + }); + }) + .catch(e => t.error(e)); + }); + test('read() should reject with the error', t => { t.plan(1); const { reader } = factory(); diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js index c5d075fc8..4ea6138f1 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js @@ -67,18 +67,15 @@ export default (label, factory, chunks) => { reader.read(); }); - test('releasing the lock should cause read() to act as if the stream is closed', t => { + test('releasing the lock should cause further read() calls to reject with a TypeError', t => { t.plan(3); const { reader } = factory(); reader.releaseLock(); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); - reader.read().then(r => - t.deepEqual(r, { value: undefined, done: true }, 'third read() should return closed result')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'first read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError')); + reader.read().catch(e => t.equal(e.constructor, TypeError, 'third read() should reject with a TypeError')); }); test('reader\'s closed property always returns the same promise', t => {