Skip to content

Commit

Permalink
Make reader closed and read() reject after release
Browse files Browse the repository at this point in the history
This significantly simplifies the internals of readers, and their interaction with their owner stream.
  • Loading branch information
domenic committed Aug 27, 2015
1 parent bd839e5 commit 632b26a
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 107 deletions.
49 changes: 12 additions & 37 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -750,16 +750,6 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<td>A List of promises returned by calls to the reader's <code>read()</code> method that have not yet been resolved,
due to the <a>consumer</a> requesting <a>chunks</a> sooner than they are available
</tr>
<tr>
<td>\[[state]]
<td>A string containing the reader's current state, used internally; one of <code>"readable"</code>,
<code>"closed"</code>, or <code>"errored"</code>
</tr>
<tr>
<td>\[[storedError]]
<td>A value indicating how the reader's stream failed, to be given as a failure reason or exception when trying to
operate on the reader; applicable only when \[[state]] is <code>"errored"</code>
</tr>
</table>

<h4 id="reader-constructor">new ReadableStreamReader(stream)</h4>
Expand All @@ -773,19 +763,15 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<emu-alg>
1. If IsReadableStream(_stream_) is *false*, throw a *TypeError* exception.
1. If IsReadableStreamLocked(_stream_) is *true*, throw a *TypeError* exception.
1. Set *this*@[[state]] to _stream_@[[state]].
1. Set *this*@[[readRequests]] to a new empty List.
1. Set *this*@[[ownerReadableStream]] to _stream_.
1. Set _stream_@[[reader]] to *this*.
1. If _stream_@[[state]] is "readable",
1. Set *this*@[[storedError]] to *undefined*.
1. Set *this*@[[closedPromise]] to a new promise.
1. Otherwise, if _stream_@[[state]] is "closed",
1. Set *this*@[[storedError]] to *undefined*.
1. Set *this*@[[closedPromise]] to a new promise resolved with *undefined*.
1. Otherwise,
1. Assert: _stream_@[[state]] is "errored".
1. Set *this*@[[storedError]] to _stream_@[[storedError]].
1. Set *this*@[[closedPromise]] to a new promise rejected with _stream_@[[storedError]].
</emu-alg>

Expand All @@ -812,11 +798,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<emu-alg>
1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*.
1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]].
1. Assert: *this*@[[ownerReadableStream]] is not *undefined*.
1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable".
1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception.
1. Return CancelReadableStream(*this*@[[ownerReadableStream]], _reason_).
</emu-alg>

Expand All @@ -839,6 +821,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<emu-alg>
1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. If *this*@[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception.
1. Return ReadFromReadableStreamReader(*this*).
</emu-alg>

Expand All @@ -859,7 +842,8 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
1. If IsReadableStreamReader(*this*) is *false*, throw a *TypeError* exception.
1. If *this*@[[ownerReadableStream]] is *undefined*, return *undefined*.
1. If *this*@[[readRequests]] is not empty, throw a *TypeError* exception.
1. If *this*@[[state]] is `"readable"`, perform CloseReadableStreamReader(*this*).
1. If *this*@[[ownerReadableStream]]@[[state]] is `"readable"`, reject *this*@[[closedPromise]] with a *TypeError* exception.
1. Otherwise, set *this*@[[closedPromise]] be a new promise rejected with a *TypeError* exception.
1. Set *this*@[[ownerReadableStream]]@[[reader]] to *undefined*.
1. Set *this*@[[ownerReadableStream]] to *undefined*.
1. Return *undefined*.
Expand Down Expand Up @@ -911,17 +895,6 @@ this to streams they did not create, and must ensure they have obeyed the precon
control of the underlying source.
</div>

<h4 id="close-readable-stream-reader" aoid="CloseReadableStreamReader" nothrow>CloseReadableStreamReader ( reader )</h4>

<emu-alg>
1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]],
1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*).
1. Set _reader_@[[readRequests]] to an empty List.
1. Set _reader_@[[state]] to "closed".
1. Resolve _reader_@[[closedPromise]] with *undefined*.
1. Return *undefined*.
</emu-alg>

<h4 id="enqueue-in-readable-stream" aoid="EnqueueInReadableStream" throws>EnqueueInReadableStream ( stream, chunk )</h4>

This abstract operation can be called by other specifications that wish to enqueue <a>chunks</a> in a readable stream,
Expand Down Expand Up @@ -978,8 +951,6 @@ an assert).
1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]],
1. Reject _readRequestPromise_ with _e_.
1. Set _reader_@[[readRequests]] to a new empty List.
1. Set _reader_@[[storedError]] to _e_.
1. Set _reader_@[[state]] to "errored".
1. Reject _reader_@[[closedPromise]] with _e_.
1. Return *undefined*.
</emu-alg>
Expand All @@ -990,7 +961,11 @@ an assert).
1. Assert: _stream_@[[state]] is "readable".
1. Set _stream_@[[state]] to "closed".
1. Let _reader_ be _stream_@[[reader]].
1. If _reader_ is not *undefined*, perform CloseReadableStreamReader(_reader_).
1. If _reader_ is *undefined*, return *undefined*.
1. Repeat for each _readRequestPromise_ that is an element of _reader_@[[readRequests]],
1. Resolve _readRequestPromise_ with CreateIterResultObject(*undefined*, *true*).
1. Set _reader_@[[readRequests]] to an empty List.
1. Resolve _reader_@[[closedPromise]] with *undefined*.
1. Return *undefined*.
</emu-alg>

Expand Down Expand Up @@ -1049,10 +1024,10 @@ readable stream is <a>locked to a reader</a>.
<h4 id="read-from-readable-stream-reader" aoid="ReadFromReadableStreamReader" nothrow>ReadFromReadableStreamReader ( reader )</h4>

<emu-alg>
1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*).
1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]].
1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*.
1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If _reader_@[[ownerReadableStream]]@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*).
1. If _reader_@[[ownerReadableStream]]@[[state]] is "errored", return a new promise rejected with _reader_@[[ownerReadableStream]]@[[storedError]].
1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable".
1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]).
Expand Down
79 changes: 29 additions & 50 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,27 +257,19 @@ class ReadableStreamReader {
this._ownerReadableStream = stream;
stream._reader = this;

this._state = stream._state;
this._readRequests = [];

if (stream._state === 'readable') {
this._storedError = undefined;

this._closedPromise = new Promise((resolve, reject) => {
this._closedPromise_resolve = resolve;
this._closedPromise_reject = reject;
});
} else if (stream._state === 'closed') {
this._storedError = undefined;

this._closedPromise = Promise.resolve(undefined);
this._closedPromise_resolve = undefined;
this._closedPromise_reject = undefined;
} else {
assert(stream._state === 'errored');

this._storedError = stream._storedError;

this._closedPromise = Promise.reject(stream._storedError);
this._closedPromise_resolve = undefined;
this._closedPromise_reject = undefined;
Expand All @@ -299,21 +291,10 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
}

if (this._ownerReadableStream !== undefined) {
this._ownerReadableStream._disturbed = true;
}

if (this._state === 'closed') {
return Promise.resolve(undefined);
}

if (this._state === 'errored') {
return Promise.reject(this._storedError);
if (this._ownerReadableStream === undefined) {
return Promise.reject(new TypeError('Cannot cancel a stream using a released reader'));
}

assert(this._ownerReadableStream !== undefined);
assert(this._ownerReadableStream._state === 'readable');

return CancelReadableStream(this._ownerReadableStream, reason);
}

Expand All @@ -323,6 +304,10 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
}

if (this._ownerReadableStream === undefined) {
return Promise.reject(new TypeError('Cannot read from a released reader'));
}

return ReadFromReadableStreamReader(this);
}

Expand All @@ -339,8 +324,12 @@ class ReadableStreamReader {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

if (this._state === 'readable') {
CloseReadableStreamReader(this);
if (this._ownerReadableStream._state === 'readable') {
this._closedPromise_reject(
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
} else {
this._closedPromise = Promise.reject(
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
}

this._ownerReadableStream._reader = undefined;
Expand Down Expand Up @@ -389,21 +378,6 @@ function CloseReadableStream(stream) {
}
}

function CloseReadableStreamReader(reader) {
for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._state = 'closed';

reader._closedPromise_resolve(undefined);
reader._closedPromise_resolve = undefined;
reader._closedPromise_reject = undefined;

return undefined;
}

function EnqueueInReadableStream(stream, chunk) {
assert(stream._closeRequested === false);
assert(stream._state !== 'errored');
Expand Down Expand Up @@ -460,9 +434,6 @@ function ErrorReadableStream(stream, e) {
}
reader._readRequests = [];

reader._storedError = e;
reader._state = 'errored';

reader._closedPromise_reject(e);
reader._closedPromise_resolve = undefined;
reader._closedPromise_reject = undefined;
Expand All @@ -477,10 +448,19 @@ function FinishClosingReadableStream(stream) {

const reader = stream._reader;

if (reader !== undefined) {
CloseReadableStreamReader(reader);
if (reader === undefined) {
return undefined;
}

for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._closedPromise_resolve(undefined);
reader._closedPromise_resolve = undefined;
reader._closedPromise_reject = undefined;

return undefined;
}

Expand Down Expand Up @@ -542,19 +522,18 @@ function IsReadableStreamReader(x) {
}

function ReadFromReadableStreamReader(reader) {
if (reader._ownerReadableStream !== undefined) {
reader._ownerReadableStream._disturbed = true;
}
assert(reader._ownerReadableStream !== undefined);

if (reader._state === 'closed') {
reader._ownerReadableStream._disturbed = true;

if (reader._ownerReadableStream._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (reader._state === 'errored') {
return Promise.reject(reader._storedError);
if (reader._ownerReadableStream._state === 'errored') {
return Promise.reject(reader._ownerReadableStream._storedError);
}

assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._state === 'readable');

if (reader._ownerReadableStream._queue.length > 0) {
Expand Down
6 changes: 3 additions & 3 deletions reference-implementation/test/readable-stream-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ test('closed should be fulfilled after stream is closed (.closed access before a
controller.close();
});

test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => {
test('closed should be rejected after reader releases its lock (multiple stream locks)', t => {
t.plan(2);

let controller;
Expand All @@ -190,8 +190,8 @@ test('closed should be fulfilled after reader releases its lock (multiple stream
const reader2 = rs.getReader();
controller.close();

reader1.closed.then(() => {
t.pass('reader1 closed should be fulfilled');
reader1.closed.catch(e => {
t.equal(e.constructor, TypeError, 'reader1 closed should be rejected with a TypeError');
});

reader2.closed.then(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ export default (label, factory) => {
);
});

test('releasing the lock should cause closed to reject and change identity', t => {
t.plan(3);
const { reader } = factory();

const closedBefore = reader.closed;
reader.releaseLock();
const closedAfter = reader.closed;

t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity')
closedBefore.then(v => t.equal(v, undefined, 'reader.closed acquired before release should fulfill'));
closedAfter.catch(
e => t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError'));
});

test('cancel() should return a distinct fulfilled promise each time', t => {
t.plan(5);
const { reader } = factory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export default (label, factory) => {
});

test('releasing the lock with pending read requests should throw but the read requests should stay pending', t => {
const { reader } = factory();
const { stream, reader } = factory();

reader.read().then(
() => t.fail('first read() should not fulfill'),
Expand All @@ -86,30 +86,31 @@ export default (label, factory) => {

t.throws(() => reader.releaseLock(), /TypeError/, 'releaseLock should throw a TypeError');

t.equal(stream.locked, true, 'the stream should still be locked');

setTimeout(() => t.end(), 50);
});

test('releasing the lock should cause further read() calls to resolve as if the stream is closed', t => {
test('releasing the lock should cause further read() calls to reject with a TypeError', t => {
t.plan(2);
const { reader } = factory();

reader.releaseLock();

reader.read().then(r =>
t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result'));
reader.read().then(r =>
t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result'));
reader.read().catch(e => t.equal(e.constructor, TypeError, 'first read() should reject with a TypeError'));
reader.read().catch(e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError'));
});

test('releasing the lock should cause closed to fulfill', t => {
test('releasing the lock should cause closed to reject', t => {
t.plan(2);
const { reader } = factory();

reader.closed.then(v => t.equal(v, undefined, 'reader.closed got before release should fulfill with undefined'));

const closedBefore = reader.closed;
reader.releaseLock();
const closedAfter = reader.closed;

reader.closed.then(v => t.equal(v, undefined, 'reader.closed got after release should fulfill with undefined'));
t.equal(closedBefore, closedAfter, 'the closed promise should not change identity')
closedBefore.catch(e => t.equal(e.constructor, TypeError, 'reader.closed should reject with a TypeError'));
});

test('releasing the lock should cause locked to become false', t => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,27 @@ export default (label, factory, error) => {
);
});

test('releasing the lock should cause closed to reject and change identity', t => {
t.plan(3);
const { reader } = factory();

const closedBefore = reader.closed;

closedBefore.catch(e => {
t.equal(e, error, 'reader.closed acquired before release should reject with the error');

reader.releaseLock();
const closedAfter = reader.closed;

t.notEqual(closedBefore, closedAfter, 'the closed promise should change identity');

return closedAfter.catch(e => {
t.equal(e.constructor, TypeError, 'reader.closed acquired after release should reject with a TypeError');
});
})
.catch(e => t.error(e));
});

test('read() should reject with the error', t => {
t.plan(1);
const { reader } = factory();
Expand Down
Loading

0 comments on commit 632b26a

Please sign in to comment.