Skip to content

Commit

Permalink
Update IsReadableStreamDisturbed to work after close or error
Browse files Browse the repository at this point in the history
  • Loading branch information
domenic committed Aug 27, 2015
1 parent 6f4bb19 commit bd839e5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
5 changes: 3 additions & 2 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<emu-alg>
1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*.
1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]].
1. Assert: *this*@[[ownerReadableStream]] is not *undefined*.
Expand Down Expand Up @@ -878,9 +879,9 @@ reader</a> for a given stream.
<h4 id="cancel-readable-stream" aoid="CancelReadableStream" nothrow>CancelReadableStream ( stream, reason )</h4>

<emu-alg>
1. Set _stream_@[[disturbed]] to *true*.
1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*.
1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]].
1. Set _stream_@[[disturbed]] to *true*.
1. Set _stream_@[[queue]] to a new empty List.
1. Perform FinishClosingReadableStream(_stream_).
1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»).
Expand Down Expand Up @@ -1048,11 +1049,11 @@ readable stream is <a>locked to a reader</a>.
<h4 id="read-from-readable-stream-reader" aoid="ReadFromReadableStreamReader" nothrow>ReadFromReadableStreamReader ( reader )</h4>

<emu-alg>
1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*).
1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]].
1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*.
1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable".
1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]).
1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]).
Expand Down
14 changes: 10 additions & 4 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
}

if (this._ownerReadableStream !== undefined) {
this._ownerReadableStream._disturbed = true;
}

if (this._state === 'closed') {
return Promise.resolve(undefined);
}
Expand Down Expand Up @@ -352,15 +356,15 @@ function AcquireReadableStreamReader(stream) {
}

function CancelReadableStream(stream, reason) {
stream._disturbed = true;

if (stream._state === 'closed') {
return Promise.resolve(undefined);
}
if (stream._state === 'errored') {
return Promise.reject(stream._storedError);
}

stream._disturbed = true;

stream._queue = [];
FinishClosingReadableStream(stream);

Expand Down Expand Up @@ -538,6 +542,10 @@ function IsReadableStreamReader(x) {
}

function ReadFromReadableStreamReader(reader) {
if (reader._ownerReadableStream !== undefined) {
reader._ownerReadableStream._disturbed = true;
}

if (reader._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}
Expand All @@ -549,8 +557,6 @@ function ReadFromReadableStreamReader(reader) {
assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._state === 'readable');

reader._ownerReadableStream._disturbed = true;

if (reader._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(reader._ownerReadableStream._queue);

Expand Down
80 changes: 78 additions & 2 deletions reference-implementation/test/abstract-ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const test = require('tape-catch');

import { IsReadableStreamDisturbed } from '../lib/readable-stream'

test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => {
test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which read() has been called', t => {
const rs = new ReadableStream();

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');
Expand All @@ -17,7 +17,7 @@ test('IsReadableStreamDisturbed returns true for a stream on which read() has be
t.end();
});

test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => {
test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which cancel() has been called', t => {
const rs = new ReadableStream();

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');
Expand All @@ -31,3 +31,79 @@ test('IsReadableStreamDisturbed returns true for a stream on which cancel() has

t.end();
});

test('IsReadableStreamDisturbed returns true for a closed stream on which read() has been called', t => {
const rs = new ReadableStream({
start(c) {
c.close();
}
});

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.read();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call');

t.end();
});

test('IsReadableStreamDisturbed returns true for a closed stream on which cancel() has been called', t => {
const rs = new ReadableStream({
start(c) {
c.close();
}
});

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.cancel();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call');

t.end();
});

test('IsReadableStreamDisturbed returns true for an errored stream on which read() has been called', t => {
const rs = new ReadableStream({
start(c) {
c.error(new Error('waffles'));
}
});

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.read();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call');

t.end();
});

test('IsReadableStreamDisturbed returns true for an errored stream on which cancel() has been called', t => {
const rs = new ReadableStream({
start(c) {
c.error(new Error('waffles'));
}
});

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.cancel();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call');

t.end();
});

0 comments on commit bd839e5

Please sign in to comment.