Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft at tee algorithms, for critique #302

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<td>A promise that becomes fulfilled when the stream becomes closed; returned by the <code>closed</code> getter
</tr>
<tr>
<td>\[[draining]]
<td>A boolean flag indicating whether the stream has been closed, but still has chunks in its internal queue that
have not yet been read
<td>\[[closeRequested]]
<td>A boolean flag indicating whether the stream has been closed by its <a>underlying source</a>, but still has
<a>chunks</a> in its internal queue that have not yet been read
</tr>
<tr>
<td>\[[enqueue]]
Expand Down Expand Up @@ -358,7 +358,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
1. Set *this*@[[underlyingSource]] to _underlyingSource_.
1. Set *this*@[[queue]] to a new empty List.
1. Set *this*@[[state]] to "readable".
1. Set *this*@[[started]], *this*@[[draining]], and *this*@[[pullScheduled]] to *false*.
1. Set *this*@[[started]], *this*@[[closeRequested]], and *this*@[[pullScheduled]] to *false*.
1. Set *this*@[[reader]], *this*@[[pullingPromise]], and *this*@[[storedError]] to *undefined*.
1. Set *this*@[[enqueue]] to CreateReadableStreamEnqueueFunction(*this*).
1. Set *this*@[[close]] to CreateReadableStreamCloseFunction(*this*).
Expand Down Expand Up @@ -627,7 +627,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable".
1. If *this*@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(*this*@[[ownerReadableStream]]@[[queue]]).
1. If *this*@[[ownerReadableStream]]@[[draining]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]).
1. If *this*@[[ownerReadableStream]]@[[closeRequested]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]).
1. Otherwise, call-with-rethrow CallReadableStreamPull(*this*@[[ownerReadableStream]]).
1. Return a new promise resolved with CreateIterResultObject(_chunk_, *false*).
1. Otherwise,
Expand Down Expand Up @@ -673,7 +673,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<h4 id="call-readable-stream-pull" aoid="CallReadableStreamPull">CallReadableStreamPull ( stream )</h4>

<pre is="emu-alg">
1. If _stream_@[[draining]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*.
1. If _stream_@[[closeRequested]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*.
1. If _stream_@[[pullingPromise]] is not *undefined*,
1. Set _stream_@[[pullScheduled]] to *true*.
1. Upon fulfillment of _stream_@[[pullingPromise]],
Expand Down Expand Up @@ -728,11 +728,19 @@ A <dfn>Readable Stream Close Function</dfn> is a built-in anonymous function of
<var>stream</var>, that performs the following steps:

<pre is="emu-alg">
1. If _stream_@[[state]] is not "readable", return *undefined*.
1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception.
1. If _stream_@[[state]] is "errored", throw a *TypeError* exception.
1. If _stream_@[[state]] is "closed", return *undefined*.
1. Set _stream_@[[closeRequested]] to *true*.
1. If _stream_@[[queue]] is empty, return CloseReadableStream(_stream_).
1. Set _stream_@[[draining]] to *true*.
</pre>

<div class="note">
The case where <var>stream</var>@\[[state]] is <code>"closed"</code>, but <var>stream</var>@\[[closeRequested]] is
<emu-val>false</emu-val>, will happen if the stream was closed without this close function ever being called: i.e.,
if the stream was closed by a call to <code>stream.cancel()</code>.
</div>

<h4 id="create-readable-stream-enqueue-function" aoid="CreateReadableStreamEnqueueFunction">CreateReadableStreamEnqueueFunction ( stream )</h4>

<pre is="emu-alg">
Expand All @@ -745,7 +753,7 @@ closing over a variable <var>stream</var>, that performs the following steps:
<pre is="emu-alg">
1. If _stream_@[[state]] is "errored", throw _stream_@[[storedError]].
1. If _stream_@[[state]] is "closed", throw a *TypeError* exception.
1. If _stream_@[[draining]] is *true*, throw a *TypeError* exception.
1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception.
1. If IsReadableStreamLocked(_stream_) is *true* and _stream_@[[reader]]@[[readRequests]] is not empty,
1. Let _readRequestPromise_ be the first element of _stream_@[[reader]]@[[readRequests]].
1. Remove _readRequestPromise_ from _stream_@[[reader]]@[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
Expand Down Expand Up @@ -785,7 +793,7 @@ A <dfn>Readable Stream Error Function</dfn> is a built-in anonymous function of
a variable <var>stream</var>, that performs the following steps:

<pre is="emu-alg">
1. If _stream_@[[state]] is not "readable" return *undefined*.
1. If _stream_@[[state]] is not "readable", throw a *TypeError* exception.
1. Let _stream_@[[queue]] be a new empty List.
1. Set _stream_@[[storedError]] to _e_.
1. Set _stream_@[[state]] to "errored".
Expand Down
176 changes: 138 additions & 38 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class ReadableStream {
this._queue = [];
this._state = 'readable';
this._started = false;
this._draining = false;
this._closeRequested = false;
this._pullScheduled = false;
this._reader = undefined;
this._pullingPromise = undefined;
Expand Down Expand Up @@ -141,6 +141,14 @@ export default class ReadableStream {
rejectPipeToPromise(reason);
}
}

tee() {
if (IsReadableStream(this) === false) {
throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
}

return TeeReadableStream(this, false);
}
}

class ReadableStreamReader {
Expand Down Expand Up @@ -204,37 +212,7 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
}

if (this._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (this._state === 'errored') {
return Promise.reject(this._storedError);
}

assert(this._ownerReadableStream !== undefined);
assert(this._ownerReadableStream._state === 'readable');

if (this._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(this._ownerReadableStream._queue);

if (this._ownerReadableStream._draining === true && this._ownerReadableStream._queue.length === 0) {
CloseReadableStream(this._ownerReadableStream);
} else {
CallReadableStreamPull(this._ownerReadableStream);
}

return Promise.resolve(CreateIterResultObject(chunk, false));
} else {
const readRequest = {};
readRequest.promise = new Promise((resolve, reject) => {
readRequest._resolve = resolve;
readRequest._reject = reject;
});

this._readRequests.push(readRequest);
return readRequest.promise;
}
return ReadFromReadableStreamReader(this);
}

releaseLock() {
Expand All @@ -259,7 +237,7 @@ function AcquireReadableStreamReader(stream) {
}

function CallReadableStreamPull(stream) {
if (stream._draining === true || stream._started === false ||
if (stream._closeRequested === true || stream._started === false ||
stream._state === 'closed' || stream._state === 'errored' ||
stream._pullScheduled === true) {
return undefined;
Expand Down Expand Up @@ -317,15 +295,23 @@ function CloseReadableStream(stream) {

function CreateReadableStreamCloseFunction(stream) {
return () => {
if (stream._state !== 'readable') {
if (stream._closeRequested === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (stream._state === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}

if (stream._state === 'closed') {
// This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
return undefined;
}

stream._closeRequested = true;

if (stream._queue.length === 0) {
return CloseReadableStream(stream);
}

stream._draining = true;
};
}

Expand All @@ -339,7 +325,7 @@ function CreateReadableStreamEnqueueFunction(stream) {
throw new TypeError('stream is closed');
}

if (stream._draining === true) {
if (stream._closeRequested === true) {
throw new TypeError('stream is draining');
}

Expand Down Expand Up @@ -387,7 +373,7 @@ function CreateReadableStreamEnqueueFunction(stream) {
function CreateReadableStreamErrorFunction(stream) {
return e => {
if (stream._state !== 'readable') {
return;
throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
}

stream._queue = [];
Expand Down Expand Up @@ -434,6 +420,40 @@ function IsReadableStreamReader(x) {
return true;
}

function ReadFromReadableStreamReader(reader) {
if (reader._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (reader._state === 'errored') {
return Promise.reject(reader._storedError);
}

assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._state === 'readable');

if (reader._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(reader._ownerReadableStream._queue);

if (reader._ownerReadableStream._closeRequested === true && reader._ownerReadableStream._queue.length === 0) {
CloseReadableStream(reader._ownerReadableStream);
} else {
CallReadableStreamPull(reader._ownerReadableStream);
}

return Promise.resolve(CreateIterResultObject(chunk, false));
} else {
const readRequest = {};
readRequest.promise = new Promise((resolve, reject) => {
readRequest._resolve = resolve;
readRequest._reject = reject;
});

reader._readRequests.push(readRequest);
return readRequest.promise;
}
}

function ReleaseReadableStreamReader(reader) {
assert(reader._ownerReadableStream !== undefined);

Expand Down Expand Up @@ -484,3 +504,83 @@ function ShouldReadableStreamApplyBackpressure(stream) {

return shouldApplyBackpressure;
}

function TeeReadableStream(stream, clone) {
assert(IsReadableStream(stream) === true);
const reader = AcquireReadableStreamReader(stream);

let canceled1 = false;
let cancelReason1 = undefined;
let canceled2 = false;
let cancelReason2 = undefined;
let closedOrErrored = false;

let cancelPromise_resolve;
const cancelPromise = new Promise((resolve, reject) => {
cancelPromise_resolve = resolve;
});

const branch1 = new ReadableStream({
pull: readAndEnqueueInBoth,
cancel(reason) {
canceled1 = true;
cancelReason1 = reason;
maybeCancelSource();
return cancelPromise;
}
});

const branch2 = new ReadableStream({
pull: readAndEnqueueInBoth,
cancel(reason) {
canceled2 = true;
cancelReason2 = reason;
maybeCancelSource();
return cancelPromise;
}
});

reader._closedPromise.catch(e => {
if (!closedOrErrored) {
branch1._error(e);
branch2._error(e);
closedOrErrored = true;
}
});

return [branch1, branch2];

function readAndEnqueueInBoth() {
ReadFromReadableStreamReader(reader).then(({ value, done }) => {
if (done && !closedOrErrored) {
branch1._close();
branch2._close();
closedOrErrored = true;
}

if (closedOrErrored) {
return;
}

let value1 = value;
let value2 = value;
if (clone) {
value1 = StructuredClone(value);
value2 = StructuredClone(value);
}

if (canceled1 === false) {
branch1._enqueue(value1);
}
if (canceled2 === false) {
branch2._enqueue(value2);
}
});
}

function maybeCancelSource() {
if (canceled1 === true && canceled2 === true) {
cancelPromise_resolve(CancelReadableStream(stream, [cancelReason1, cancelReason2]));
}
}
}
3 changes: 3 additions & 0 deletions reference-implementation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@
"text-table": "^0.2.0",
"traceur": "0.0.84",
"traceur-runner": "^1.0.2"
},
"dependencies": {
"cyclonejs": "^1.1.1"
}
}
Loading