Skip to content

Commit

Permalink
Factor out ReadFromReadableStreamReader abstract operation
Browse files Browse the repository at this point in the history
Again, needed for formalizing the tee algorithm from #302.
  • Loading branch information
domenic committed Apr 6, 2015
1 parent 62b8a68 commit 05674df
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
34 changes: 20 additions & 14 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -693,20 +693,8 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
</div>

<pre is="emu-alg">
1. If IsReadableStreamReader(*this*) is *false*, throw a *TypeError* exception.
1. If *this*@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*).
1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]].
1. Assert: *this*@[[ownerReadableStream]] is not *undefined*.
1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable".
1. If *this*@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(*this*@[[ownerReadableStream]]@[[queue]]).
1. If *this*@[[ownerReadableStream]]@[[closeRequested]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow FinishClosingReadableStream(*this*@[[ownerReadableStream]]).
1. Otherwise, call-with-rethrow CallReadableStreamPull(*this*@[[ownerReadableStream]]).
1. Return a new promise resolved with CreateIterResultObject(_chunk_, *false*).
1. Otherwise,
1. Let _readRequestPromise_ be a new promise.
1. Append _readRequestPromise_ as the last element of *this*@[[readRequests]].
1. Return _readRequestPromise_.
1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. Return ReadFromReadableStreamReader(*this*).
</pre>

<h5 id="reader-release-lock">releaseLock()</h5>
Expand Down Expand Up @@ -886,6 +874,24 @@ readable stream is <a>locked to a reader</a>.
1. Return *true*.
</pre>

<h4 id="read-from-readable-stream-reader" aoid="ReadFromReadableStreamReader">ReadFromReadableStreamReader ( reader )</h4>

<pre is="emu-alg">
1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*).
1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]].
1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*.
1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable".
1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]).
1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow FinishClosingReadableStream(_reader_@[[ownerReadableStream]]).
1. Otherwise, call-with-rethrow CallReadableStreamPull(_reader_@[[ownerReadableStream]]).
1. Return a new promise resolved with CreateIterResultObject(_chunk_, *false*).
1. Otherwise,
1. Let _readRequestPromise_ be a new promise.
1. Append _readRequestPromise_ as the last element of _reader_@[[readRequests]].
1. Return _readRequestPromise_.
</pre>

<h4 id="release-readable-stream-reader" aoid="ReleaseReadableStreamReader">ReleaseReadableStreamReader ( reader )</h4>

<pre is="emu-alg">
Expand Down
66 changes: 35 additions & 31 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,37 +267,7 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
}

if (this._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (this._state === 'errored') {
return Promise.reject(this._storedError);
}

assert(this._ownerReadableStream !== undefined);
assert(this._ownerReadableStream._state === 'readable');

if (this._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(this._ownerReadableStream._queue);

if (this._ownerReadableStream._closeRequested === true && this._ownerReadableStream._queue.length === 0) {
FinishClosingReadableStream(this._ownerReadableStream);
} else {
CallReadableStreamPull(this._ownerReadableStream);
}

return Promise.resolve(CreateIterResultObject(chunk, false));
} else {
const readRequest = {};
readRequest.promise = new Promise((resolve, reject) => {
readRequest._resolve = resolve;
readRequest._reject = reject;
});

this._readRequests.push(readRequest);
return readRequest.promise;
}
return ReadFromReadableStreamReader(this);
}

releaseLock() {
Expand Down Expand Up @@ -498,6 +468,40 @@ function IsReadableStreamReader(x) {
return true;
}

function ReadFromReadableStreamReader(reader) {
if (reader._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (reader._state === 'errored') {
return Promise.reject(reader._storedError);
}

assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._state === 'readable');

if (reader._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(reader._ownerReadableStream._queue);

if (reader._ownerReadableStream._closeRequested === true && reader._ownerReadableStream._queue.length === 0) {
FinishClosingReadableStream(reader._ownerReadableStream);
} else {
CallReadableStreamPull(reader._ownerReadableStream);
}

return Promise.resolve(CreateIterResultObject(chunk, false));
} else {
const readRequest = {};
readRequest.promise = new Promise((resolve, reject) => {
readRequest._resolve = resolve;
readRequest._reject = reject;
});

reader._readRequests.push(readRequest);
return readRequest.promise;
}
}

function ReleaseReadableStreamReader(reader) {
assert(reader._ownerReadableStream !== undefined);

Expand Down

0 comments on commit 05674df

Please sign in to comment.