Skip to content

Commit

Permalink
Add IsReadableStreamDisturbed predicate
Browse files Browse the repository at this point in the history
Closes #378.
  • Loading branch information
tyoshino authored and domenic committed Aug 18, 2015
1 parent 9945ae1 commit 8a4c61c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
19 changes: 18 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<td>A <a href="#rs-controller-class"><code>ReadableStreamController</code></a> created with the ability to control the
state and queue of this stream
</tr>
<tr>
<td>\[[disturbed]]
<td>A boolean flag set to <b>true</b> when the stream has been read from or canceled
</tr>
<tr>
<td>\[[pullAgain]]
<td>A boolean flag set to <b>true</b> if the stream's mechanisms requested a call to the underlying source's
Expand Down Expand Up @@ -398,6 +402,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
1. Set *this*@[[state]] to "readable".
1. Set *this*@[[started]], *this*@[[closeRequested]], *this*@[[pullAgain]], and *this*@[[pulling]] to *false*.
1. Set *this*@[[reader]] and *this*@[[storedError]] to *undefined*.
1. Set *this*@[[disturbed]] to *false*.
1. Set *this*@[[controller]] to Construct(`ReadableStreamController`, «*this*»).
1. Let _normalizedStrategy_ be ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
1. Set *this*@[[strategySize]] to _normalizedStrategy_.[[size]] and *this*@[[strategyHWM]] to _normalizedStrategy_.[[highWaterMark]].
Expand Down Expand Up @@ -875,6 +880,7 @@ reader</a> for a given stream.
<emu-alg>
1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*.
1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]].
1. Set _stream_@[[disturbed]] to *true*.
1. Set _stream_@[[queue]] to a new empty List.
1. Perform FinishClosingReadableStream(_stream_).
1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»).
Expand Down Expand Up @@ -1012,7 +1018,17 @@ an assert).
1. Return *true*.
</emu-alg>

<h4 id="is-readable-stream-locked" aoid="IsReadableStreamLocked" nothrow>IsReadableStreamLocked ( stream )</h4>
<h4 id="is-readable-stream-disturbed" aoid="IsReadableStreamDisturbed">IsReadableStreamDisturbed ( stream )</h4>

This abstract operation is meant to be called from other specifications that may wish to query whether or not a
readable stream has ever been read from or canceled.

<emu-alg>
1. Assert: IsReadableStream(_stream_) is *true*.
1. Return _stream_@[[disturbed]].
</emu-alg>

<h4 id="is-readable-stream-locked" aoid="IsReadableStreamLocked">IsReadableStreamLocked ( stream )</h4>

This abstract operation is meant to be called from other specifications that may wish to query whether or not a
readable stream is <a>locked to a reader</a>.
Expand All @@ -1038,6 +1054,7 @@ readable stream is <a>locked to a reader</a>.
1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]].
1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*.
1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable".
1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*.
1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]).
1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]).
Expand Down
12 changes: 12 additions & 0 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export default class ReadableStream {
this._reader = undefined;
this._storedError = undefined;

this._disturbed = false;

const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this._strategySize = normalizedStrategy.size;
this._strategyHWM = normalizedStrategy.highWaterMark;
Expand Down Expand Up @@ -357,6 +359,8 @@ function CancelReadableStream(stream, reason) {
return Promise.reject(stream._storedError);
}

stream._disturbed = true;

stream._queue = [];
FinishClosingReadableStream(stream);

Expand Down Expand Up @@ -493,6 +497,12 @@ function IsReadableStream(x) {
return true;
}

export function IsReadableStreamDisturbed(stream) {
assert(IsReadableStream(stream) === true, 'IsReadableStreamDisturbed should only be used on known readable streams');

return stream._disturbed;
}

function IsReadableStreamLocked(stream) {
assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams');

Expand Down Expand Up @@ -539,6 +549,8 @@ function ReadFromReadableStreamReader(reader) {
assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._state === 'readable');

reader._ownerReadableStream._disturbed = true;

if (reader._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(reader._ownerReadableStream._queue);

Expand Down
33 changes: 33 additions & 0 deletions reference-implementation/test/abstract-ops.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const test = require('tape-catch');

import { IsReadableStreamDisturbed } from '../lib/readable-stream'

test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => {
const rs = new ReadableStream();

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.read();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call');

t.end();
});

test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => {
const rs = new ReadableStream();

t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction');

const reader = rs.getReader();
t.equal(IsReadableStreamDisturbed(rs), false,
'getReader() call has no effect on whether a stream is disturbed or not');

reader.cancel();
t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call');

t.end();
});

0 comments on commit 8a4c61c

Please sign in to comment.