From 98685d48509db6da3be3e5f45c15cda8729ae044 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Thu, 19 Mar 2015 16:56:32 +0900 Subject: [PATCH 1/7] Throw on underlying source double-close or double-error Fixes #298. --- index.bs | 28 ++++-- .../lib/readable-stream.js | 24 +++-- .../test/bad-underlying-sources.js | 99 +++++++++++++++++++ .../test/readable-stream.js | 51 ---------- 4 files changed, 133 insertions(+), 69 deletions(-) diff --git a/index.bs b/index.bs index f7db4bebe..8b608d3cf 100644 --- a/index.bs +++ b/index.bs @@ -270,9 +270,9 @@ Instances of ReadableStream are created with the internal slots des A promise that becomes fulfilled when the stream becomes closed; returned by the closed getter - \[[draining]] - A boolean flag indicating whether the stream has been closed, but still has chunks in its internal queue that - have not yet been read + \[[closeRequested]] + A boolean flag indicating whether the stream has been closed by its underlying source, but still has + chunks in its internal queue that have not yet been read \[[enqueue]] @@ -358,7 +358,7 @@ Instances of ReadableStream are created with the internal slots des 1. Set *this*@[[underlyingSource]] to _underlyingSource_. 1. Set *this*@[[queue]] to a new empty List. 1. Set *this*@[[state]] to "readable". - 1. Set *this*@[[started]], *this*@[[draining]], and *this*@[[pullScheduled]] to *false*. + 1. Set *this*@[[started]], *this*@[[closeRequested]], and *this*@[[pullScheduled]] to *false*. 1. Set *this*@[[reader]], *this*@[[pullingPromise]], and *this*@[[storedError]] to *undefined*. 1. Set *this*@[[enqueue]] to CreateReadableStreamEnqueueFunction(*this*). 1. Set *this*@[[close]] to CreateReadableStreamCloseFunction(*this*). @@ -627,7 +627,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable". 1. If *this*@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(*this*@[[ownerReadableStream]]@[[queue]]). - 1. If *this*@[[ownerReadableStream]]@[[draining]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]). + 1. If *this*@[[ownerReadableStream]]@[[closeRequested]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]). 1. Otherwise, call-with-rethrow CallReadableStreamPull(*this*@[[ownerReadableStream]]). 1. Return a new promise resolved with CreateIterResultObject(_chunk_, *false*). 1. Otherwise, @@ -673,7 +673,7 @@ Instances of ReadableStreamReader are created with the internal slo

CallReadableStreamPull ( stream )

-  1. If _stream_@[[draining]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*.
+  1. If _stream_@[[closeRequested]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*.
   1. If _stream_@[[pullingPromise]] is not *undefined*,
     1. Set _stream_@[[pullScheduled]] to *true*.
     1. Upon fulfillment of _stream_@[[pullingPromise]],
@@ -728,11 +728,19 @@ A Readable Stream Close Function is a built-in anonymous function of
 stream, that performs the following steps:
 
 
-  1. If _stream_@[[state]] is not "readable", return *undefined*.
+  1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception.
+  1. If _stream_@[[state]] is "errored", throw a *TypeError* exception.
+  1. If _stream_@[[state]] is "closed", return *undefined*.
+  1. Set _stream_@[[closeRequested]] to *true*.
   1. If _stream_@[[queue]] is empty, return CloseReadableStream(_stream_).
-  1. Set _stream_@[[draining]] to *true*.
 
+
+ The case where stream@\[[state]] is "closed", but stream@\[[closeRequested]] is + false, will happen if the stream was closed without this close function ever being called: i.e., + if the stream was closed by a call to stream.cancel(). +
+

CreateReadableStreamEnqueueFunction ( stream )

@@ -745,7 +753,7 @@ closing over a variable stream, that performs the following steps:
 
   1. If _stream_@[[state]] is "errored", throw _stream_@[[storedError]].
   1. If _stream_@[[state]] is "closed", throw a *TypeError* exception.
-  1. If _stream_@[[draining]] is *true*, throw a *TypeError* exception.
+  1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception.
   1. If IsReadableStreamLocked(_stream_) is *true* and _stream_@[[reader]]@[[readRequests]] is not empty,
     1. Let _readRequestPromise_ be the first element of _stream_@[[reader]]@[[readRequests]].
     1. Remove _readRequestPromise_ from _stream_@[[reader]]@[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
@@ -785,7 +793,7 @@ A Readable Stream Error Function is a built-in anonymous function of
 a variable stream, that performs the following steps:
 
 
-  1. If _stream_@[[state]] is not "readable" return *undefined*.
+  1. If _stream_@[[state]] is not "readable", throw a *TypeError* exception.
   1. Let _stream_@[[queue]] be a new empty List.
   1. Set _stream_@[[storedError]] to _e_.
   1. Set _stream_@[[state]] to "errored".
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 50f5baced..3d4ede74f 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -8,7 +8,7 @@ export default class ReadableStream {
     this._queue = [];
     this._state = 'readable';
     this._started = false;
-    this._draining = false;
+    this._closeRequested = false;
     this._pullScheduled = false;
     this._reader = undefined;
     this._pullingPromise = undefined;
@@ -218,7 +218,7 @@ class ReadableStreamReader {
     if (this._ownerReadableStream._queue.length > 0) {
       const chunk = DequeueValue(this._ownerReadableStream._queue);
 
-      if (this._ownerReadableStream._draining === true && this._ownerReadableStream._queue.length === 0) {
+      if (this._ownerReadableStream._closeRequested === true && this._ownerReadableStream._queue.length === 0) {
         CloseReadableStream(this._ownerReadableStream);
       } else {
         CallReadableStreamPull(this._ownerReadableStream);
@@ -259,7 +259,7 @@ function AcquireReadableStreamReader(stream) {
 }
 
 function CallReadableStreamPull(stream) {
-  if (stream._draining === true || stream._started === false ||
+  if (stream._closeRequested === true || stream._started === false ||
       stream._state === 'closed' || stream._state === 'errored' ||
       stream._pullScheduled === true) {
     return undefined;
@@ -317,15 +317,23 @@ function CloseReadableStream(stream) {
 
 function CreateReadableStreamCloseFunction(stream) {
   return () => {
-    if (stream._state !== 'readable') {
+    if (stream._closeRequested === true) {
+      throw new TypeError('The stream has already been closed; do not close it again!');
+    }
+    if (stream._state === 'errored') {
+      throw new TypeError('The stream is in an errored state and cannot be closed');
+    }
+
+    if (stream._state === 'closed') {
+      // This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
       return undefined;
     }
 
+    stream._closeRequested = true;
+
     if (stream._queue.length === 0) {
       return CloseReadableStream(stream);
     }
-
-    stream._draining = true;
   };
 }
 
@@ -339,7 +347,7 @@ function CreateReadableStreamEnqueueFunction(stream) {
       throw new TypeError('stream is closed');
     }
 
-    if (stream._draining === true) {
+    if (stream._closeRequested === true) {
       throw new TypeError('stream is draining');
     }
 
@@ -387,7 +395,7 @@ function CreateReadableStreamEnqueueFunction(stream) {
 function CreateReadableStreamErrorFunction(stream) {
   return e => {
     if (stream._state !== 'readable') {
-      return;
+      throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
     }
 
     stream._queue = [];
diff --git a/reference-implementation/test/bad-underlying-sources.js b/reference-implementation/test/bad-underlying-sources.js
index c24334ede..3f3e02874 100644
--- a/reference-implementation/test/bad-underlying-sources.js
+++ b/reference-implementation/test/bad-underlying-sources.js
@@ -321,3 +321,102 @@ test('Underlying source: strategy.size returning +Infinity', t => {
 
   rs.getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error'));
 });
+
+test('Underlying source: calling close twice on an empty stream should throw the second time', t => {
+  t.plan(2);
+
+  new ReadableStream({
+    start(enqueue, close) {
+      close();
+      t.throws(close, /TypeError/, 'second call to close should throw a TypeError');
+    }
+  })
+  .getReader().closed.then(() => t.pass('closed should fulfill'));
+});
+
+test('Underlying source: calling close twice on a non-empty stream should throw the second time', t => {
+  t.plan(3);
+
+  const reader = new ReadableStream({
+    start(enqueue, close) {
+      enqueue('a');
+      close();
+      t.throws(close, /TypeError/, 'second call to close should throw a TypeError');
+    }
+  })
+  .getReader();
+
+  reader.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'read() should read the enqueued chunk'));
+  reader.closed.then(() => t.pass('closed should fulfill'));
+});
+
+test('Underlying source: calling close on an empty canceled stream should not throw', t => {
+  t.plan(2);
+
+  let doClose;
+  const rs = new ReadableStream({
+    start(enqueue, close) {
+      doClose = close;
+    }
+  });
+
+  rs.cancel();
+  t.doesNotThrow(doClose, 'calling close after canceling should not throw anything');
+
+  rs.getReader().closed.then(() => t.pass('closed should fulfill'));
+});
+
+test('Underlying source: calling close on a non-empty canceled stream should not throw', t => {
+  t.plan(2);
+
+  let doClose;
+  const rs = new ReadableStream({
+    start(enqueue, close) {
+      enqueue('a');
+      doClose = close;
+    }
+  });
+
+  rs.cancel();
+  t.doesNotThrow(doClose, 'calling close after canceling should not throw anything');
+
+  rs.getReader().closed.then(() => t.pass('closed should fulfill'));
+});
+
+test('Underlying source: calling close after error should throw', t => {
+  t.plan(2);
+
+  const theError = new Error('boo');
+  new ReadableStream({
+    start(enqueue, close, error) {
+      error(theError);
+      t.throws(close, /TypeError/, 'call to close should throw a TypeError');
+    }
+  })
+  .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error'));
+});
+
+test('Underlying source: calling error twice should throw the second time', t => {
+  t.plan(2);
+
+  const theError = new Error('boo');
+  new ReadableStream({
+    start(enqueue, close, error) {
+      error(theError);
+      t.throws(error, /TypeError/, 'second call to error should throw a TypeError');
+    }
+  })
+  .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error'));
+});
+
+test('Underlying source: calling error after close should throw', t => {
+  t.plan(2);
+
+  new ReadableStream({
+    start(enqueue, close, error) {
+      close();
+      t.throws(error, /TypeError/, 'call to error should throw a TypeError');
+    }
+  })
+  .getReader().closed.then(() => t.pass('closed should fulfill'));
+});
diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js
index ca256790a..a1fe45925 100644
--- a/reference-implementation/test/readable-stream.js
+++ b/reference-implementation/test/readable-stream.js
@@ -38,57 +38,6 @@ test('ReadableStream: if pull rejects, it should error the stream', t => {
   });
 });
 
-test('ReadableStream: calling close twice should be a no-op', t => {
-  t.plan(2);
-
-  new ReadableStream({
-    start(enqueue, close) {
-      close();
-      t.doesNotThrow(close);
-    }
-  })
-  .getReader().closed.then(() => t.pass('closed should fulfill'));
-});
-
-test('ReadableStream: calling error twice should be a no-op', t => {
-  t.plan(2);
-
-  const theError = new Error('boo!');
-  const error2 = new Error('not me!');
-  new ReadableStream({
-    start(enqueue, close, error) {
-      error(theError);
-      t.doesNotThrow(() => error(error2));
-    }
-  })
-  .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the first error'));
-});
-
-test('ReadableStream: calling error after close should be a no-op', t => {
-  t.plan(2);
-
-  new ReadableStream({
-    start(enqueue, close, error) {
-      close();
-      t.doesNotThrow(error);
-    }
-  })
-  .getReader().closed.then(() => t.pass('closed should fulfill'));
-});
-
-test('ReadableStream: calling close after error should be a no-op', t => {
-  t.plan(2);
-
-  const theError = new Error('boo!');
-  new ReadableStream({
-    start(enqueue, close, error) {
-      error(theError);
-      t.doesNotThrow(close);
-    }
-  })
-  .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the first error'));
-});
-
 test('ReadableStream: should only call pull once upon starting the stream', t => {
   t.plan(2);
 

From b593a3f6c926c3ce2335c06a2d965bdb93773228 Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Wed, 18 Mar 2015 20:36:11 +0900
Subject: [PATCH 2/7] First draft at tee algorithms, for critique

---
 .../lib/readable-stream.js                    | 216 ++++++++++++++++++
 1 file changed, 216 insertions(+)

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 3d4ede74f..87a992e53 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -492,3 +492,219 @@ function ShouldReadableStreamApplyBackpressure(stream) {
 
   return shouldApplyBackpressure;
 }
+
+function TeeReadableStream(stream, clone) {
+  const reader = stream.getReader();
+
+  let enqueue1, enqueue2, close1, close2, error1, error2;
+  let canceled1, cancelReason1, canceled2, cancelReason2;
+  const branch1 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue1, close1, error1] = [enqueue, close, error];
+    },
+    cancel(reason) {
+      canceled1 = true;
+      cancelReason1 = reason;
+      maybeCancelSource();
+    }
+  });
+  const branch2 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue2, close2, error2] = [enqueue, close, error];
+    },
+    cancel(reason) {
+      canceled2 = true;
+      cancelReason2 = reason;
+      maybeCancelSource();
+    }
+  });
+
+  pump();
+
+  return [branch1, branch2];
+
+  function pump() {
+    reader.read().then(
+      ({ value, done }) => {
+        if (done) {
+          close1();
+          close2();
+          return;
+        }
+
+        if (clone) {
+          enqueue1(StructuredClone(value));
+          enqueue2(StructuredClone(value));
+        } else {
+          enqueue1(value);
+          enqueue2(value);
+        }
+        pump();
+      },
+      e => {
+        error1(e);
+        error2(e);
+      }
+    );
+  }
+
+  function maybeCancelSource() {
+    if (canceled1 && canceled2) {
+      reader.cancel([cancelReason1, cancelReason2]);
+    }
+  }
+}
+
+function TeeReadableStream2(stream, clone) {
+  const reader = stream.getReader();
+
+  let enqueue1, enqueue2, close1, close2, error1, error2;
+  let canceled1, cancelReason1, canceled2, cancelReason2;
+  const branch1 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue1, close1, error1] = [enqueue, close, error];
+    },
+    pull: readAndEnqueueInBoth,
+    cancel(reason) {
+      canceled1 = true;
+      cancelReason1 = reason;
+      maybeCancelSource();
+    }
+  });
+  const branch2 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue2, close2, error2] = [enqueue, close, error];
+    },
+    pull: readAndEnqueueInBoth,
+    cancel(reason) {
+      canceled2 = true;
+      cancelReason2 = reason;
+      maybeCancelSource();
+    }
+  });
+
+  return [branch1, branch2];
+
+  function readAndEnqueueInBoth() {
+    reader.read().then(
+      ({ value, done }) => {
+        if (done) {
+          close1();
+          close2();
+          return;
+        }
+
+        if (clone) {
+          enqueue1(StructuredClone(value));
+          enqueue2(StructuredClone(value));
+        } else {
+          enqueue1(value);
+          enqueue2(value);
+        }
+      },
+      e => {
+        error1(e);
+        error2(e);
+      }
+    );
+  }
+
+  function maybeCancelSource() {
+    if (canceled1 && canceled2) {
+      reader.cancel([cancelReason1, cancelReason2]);
+    }
+  }
+}
+
+
+function SpeculativeTeeReadableByteStream(stream, clone) {
+  const reader = stream.getReader({ feedBuffers: true });
+
+  let enqueue1, close1, error1, enqueue2, close2, error2;
+  let canceled1, cancelReason1, canceled2, cancelReason2;
+  const branch1 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue1, close1, error1] = [enqueue, close, error];
+    },
+    read: readIntoAndEnqueueInOther(enqueue2),
+    pull: readAndEnqueueInBoth,
+    cancel(reason) {
+      canceled1 = true;
+      cancelReason1 = reason;
+      maybeCancelSource();
+    }
+  });
+
+  const branch2 = new ReadableStream({
+    start(enqueue, close, error) {
+      [enqueue2, close2, error2] = [enqueue, close, error];
+    },
+    read: readIntoAndEnqueueInOther(enqueue1),
+    pull: readAndEnqueueInBoth,
+    cancel(reason) {
+      canceled2 = true;
+      cancelReason2 = reason;
+      maybeCancelSource();
+    }
+  });
+
+  return [branch1, branch2];
+
+  function readIntoAndEnqueueInOther(otherEnqueue) {
+    return (view, ready) => {
+      reader.read(view).then(
+        ({ value, done }) => {
+          if (done) {
+            close1();
+            close2();
+            return;
+          }
+
+          if (clone) {
+            otherEnqueue(StructuredClone(value));
+          } else {
+            otherEnqueue(value);
+          }
+
+          ready();
+        },
+        e => {
+          error1(e);
+          error2(e);
+        }
+      );
+    };
+  }
+
+  function readAndEnqueueInBoth() {
+    // Assuming FeedBufferReadableByteStreamReaders (or whatever) allow read() with no arguments still,
+    // which they probably should (?).
+    reader.read().then(
+      ({ value, done }) => {
+        if (done) {
+          close1();
+          close2();
+          return;
+        }
+
+        if (clone) {
+          enqueue1(StructuredClone(value));
+          enqueue2(StructuredClone(value));
+        } else {
+          enqueue1(value);
+          enqueue2(value);
+        }
+      },
+      e => {
+        error1(e);
+        error2(e);
+      }
+    );
+  }
+
+  function maybeCancelSource() {
+    if (canceled1 && canceled2) {
+      reader.cancel([cancelReason1, cancelReason2]);
+    }
+  }
+}

From 5202e1036906418b3ab3ac263ab4672857b74169 Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Wed, 18 Mar 2015 20:51:21 +0900
Subject: [PATCH 3/7] Remove original TeeReadableStream with broken
 backpressure

---
 .../lib/readable-stream.js                    | 62 -------------------
 1 file changed, 62 deletions(-)

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 87a992e53..9c36aada1 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -496,68 +496,6 @@ function ShouldReadableStreamApplyBackpressure(stream) {
 function TeeReadableStream(stream, clone) {
   const reader = stream.getReader();
 
-  let enqueue1, enqueue2, close1, close2, error1, error2;
-  let canceled1, cancelReason1, canceled2, cancelReason2;
-  const branch1 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue1, close1, error1] = [enqueue, close, error];
-    },
-    cancel(reason) {
-      canceled1 = true;
-      cancelReason1 = reason;
-      maybeCancelSource();
-    }
-  });
-  const branch2 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue2, close2, error2] = [enqueue, close, error];
-    },
-    cancel(reason) {
-      canceled2 = true;
-      cancelReason2 = reason;
-      maybeCancelSource();
-    }
-  });
-
-  pump();
-
-  return [branch1, branch2];
-
-  function pump() {
-    reader.read().then(
-      ({ value, done }) => {
-        if (done) {
-          close1();
-          close2();
-          return;
-        }
-
-        if (clone) {
-          enqueue1(StructuredClone(value));
-          enqueue2(StructuredClone(value));
-        } else {
-          enqueue1(value);
-          enqueue2(value);
-        }
-        pump();
-      },
-      e => {
-        error1(e);
-        error2(e);
-      }
-    );
-  }
-
-  function maybeCancelSource() {
-    if (canceled1 && canceled2) {
-      reader.cancel([cancelReason1, cancelReason2]);
-    }
-  }
-}
-
-function TeeReadableStream2(stream, clone) {
-  const reader = stream.getReader();
-
   let enqueue1, enqueue2, close1, close2, error1, error2;
   let canceled1, cancelReason1, canceled2, cancelReason2;
   const branch1 = new ReadableStream({

From 782aec72ed707f394c64a1c07c81e26d2a78c2e3 Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Thu, 19 Mar 2015 15:37:05 +0900
Subject: [PATCH 4/7] Remove SpeculativeTeeReadableByteStream for now

First step toward formalizing
---
 .../lib/readable-stream.js                    | 93 -------------------
 1 file changed, 93 deletions(-)

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 9c36aada1..a6d5b5109 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -553,96 +553,3 @@ function TeeReadableStream(stream, clone) {
     }
   }
 }
-
-
-function SpeculativeTeeReadableByteStream(stream, clone) {
-  const reader = stream.getReader({ feedBuffers: true });
-
-  let enqueue1, close1, error1, enqueue2, close2, error2;
-  let canceled1, cancelReason1, canceled2, cancelReason2;
-  const branch1 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue1, close1, error1] = [enqueue, close, error];
-    },
-    read: readIntoAndEnqueueInOther(enqueue2),
-    pull: readAndEnqueueInBoth,
-    cancel(reason) {
-      canceled1 = true;
-      cancelReason1 = reason;
-      maybeCancelSource();
-    }
-  });
-
-  const branch2 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue2, close2, error2] = [enqueue, close, error];
-    },
-    read: readIntoAndEnqueueInOther(enqueue1),
-    pull: readAndEnqueueInBoth,
-    cancel(reason) {
-      canceled2 = true;
-      cancelReason2 = reason;
-      maybeCancelSource();
-    }
-  });
-
-  return [branch1, branch2];
-
-  function readIntoAndEnqueueInOther(otherEnqueue) {
-    return (view, ready) => {
-      reader.read(view).then(
-        ({ value, done }) => {
-          if (done) {
-            close1();
-            close2();
-            return;
-          }
-
-          if (clone) {
-            otherEnqueue(StructuredClone(value));
-          } else {
-            otherEnqueue(value);
-          }
-
-          ready();
-        },
-        e => {
-          error1(e);
-          error2(e);
-        }
-      );
-    };
-  }
-
-  function readAndEnqueueInBoth() {
-    // Assuming FeedBufferReadableByteStreamReaders (or whatever) allow read() with no arguments still,
-    // which they probably should (?).
-    reader.read().then(
-      ({ value, done }) => {
-        if (done) {
-          close1();
-          close2();
-          return;
-        }
-
-        if (clone) {
-          enqueue1(StructuredClone(value));
-          enqueue2(StructuredClone(value));
-        } else {
-          enqueue1(value);
-          enqueue2(value);
-        }
-      },
-      e => {
-        error1(e);
-        error2(e);
-      }
-    );
-  }
-
-  function maybeCancelSource() {
-    if (canceled1 && canceled2) {
-      reader.cancel([cancelReason1, cancelReason2]);
-    }
-  }
-}

From c8223d9f4dc0ab052f87296946e0dd90c09dae45 Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Thu, 19 Mar 2015 15:55:10 +0900
Subject: [PATCH 5/7] stash

---
 reference-implementation/lib/readable-stream.js | 6 +++++-
 reference-implementation/package.json           | 3 +++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index a6d5b5109..436619cef 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -494,8 +494,12 @@ function ShouldReadableStreamApplyBackpressure(stream) {
 }
 
 function TeeReadableStream(stream, clone) {
-  const reader = stream.getReader();
+  assert(IsReadableStream(stream) === true);
+  const reader = AcquireReadableStreamReader(stream);
 
+  const branch1 = new ReadableStream({
+    pull: readAndEnqueueInBoth
+  })
   let enqueue1, enqueue2, close1, close2, error1, error2;
   let canceled1, cancelReason1, canceled2, cancelReason2;
   const branch1 = new ReadableStream({
diff --git a/reference-implementation/package.json b/reference-implementation/package.json
index bf545b330..c918a8628 100644
--- a/reference-implementation/package.json
+++ b/reference-implementation/package.json
@@ -35,5 +35,8 @@
     "text-table": "^0.2.0",
     "traceur": "0.0.84",
     "traceur-runner": "^1.0.2"
+  },
+  "dependencies": {
+    "cyclonejs": "^1.1.1"
   }
 }

From 8f76e98dc217c4bd5c497e149abcce433ad53323 Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Thu, 19 Mar 2015 18:58:25 +0900
Subject: [PATCH 6/7] Start taking care of edge cases, and using internals

---
 .../lib/readable-stream.js                    |  89 +++++----
 reference-implementation/test/brand-checks.js |   6 +
 .../test/readable-stream-tee.js               | 170 ++++++++++++++++++
 .../test/templated/readable-stream-empty.js   |   1 +
 4 files changed, 234 insertions(+), 32 deletions(-)
 create mode 100644 reference-implementation/test/readable-stream-tee.js

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 436619cef..169cbbd38 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -141,6 +141,14 @@ export default class ReadableStream {
       rejectPipeToPromise(reason);
     }
   }
+
+  tee() {
+    if (IsReadableStream(this) === false) {
+      throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
+    }
+
+    return TeeReadableStream(this, false);
+  }
 }
 
 class ReadableStreamReader {
@@ -494,66 +502,83 @@ function ShouldReadableStreamApplyBackpressure(stream) {
 }
 
 function TeeReadableStream(stream, clone) {
+  // TODO: avoid accessing public .read()
+
   assert(IsReadableStream(stream) === true);
   const reader = AcquireReadableStreamReader(stream);
 
+  let canceled1 = false;
+  let cancelReason1 = undefined;
+  let canceled2 = false;
+  let cancelReason2 = undefined;
+  let closedOrErrored = false;
+
+  let cancelPromise_resolve;
+  const cancelPromise = new Promise((resolve, reject) => {
+    cancelPromise_resolve = resolve;
+  });
+
   const branch1 = new ReadableStream({
-    pull: readAndEnqueueInBoth
-  })
-  let enqueue1, enqueue2, close1, close2, error1, error2;
-  let canceled1, cancelReason1, canceled2, cancelReason2;
-  const branch1 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue1, close1, error1] = [enqueue, close, error];
-    },
     pull: readAndEnqueueInBoth,
     cancel(reason) {
       canceled1 = true;
       cancelReason1 = reason;
       maybeCancelSource();
+      return cancelPromise;
     }
   });
+
   const branch2 = new ReadableStream({
-    start(enqueue, close, error) {
-      [enqueue2, close2, error2] = [enqueue, close, error];
-    },
     pull: readAndEnqueueInBoth,
     cancel(reason) {
       canceled2 = true;
       cancelReason2 = reason;
       maybeCancelSource();
+      return cancelPromise;
+    }
+  });
+
+  reader._closedPromise.catch(e => {
+    if (!closedOrErrored) {
+      branch1._error(e);
+      branch2._error(e);
+      closedOrErrored = true;
     }
   });
 
   return [branch1, branch2];
 
   function readAndEnqueueInBoth() {
-    reader.read().then(
-      ({ value, done }) => {
-        if (done) {
-          close1();
-          close2();
-          return;
-        }
+    reader.read().then(({ value, done }) => {
+      if (done && !closedOrErrored) {
+        branch1._close();
+        branch2._close();
+        closedOrErrored = true;
+      }
 
-        if (clone) {
-          enqueue1(StructuredClone(value));
-          enqueue2(StructuredClone(value));
-        } else {
-          enqueue1(value);
-          enqueue2(value);
-        }
-      },
-      e => {
-        error1(e);
-        error2(e);
+      if (closedOrErrored) {
+        return;
       }
-    );
+
+      let value1 = value;
+      let value2 = value;
+      if (clone) {
+        value1 = StructuredClone(value);
+        value2 = StructuredClone(value);
+      }
+
+      if (canceled1 === false) {
+        branch1._enqueue(value1);
+      }
+      if (canceled2 === false) {
+        branch2._enqueue(value2);
+      }
+    });
   }
 
   function maybeCancelSource() {
-    if (canceled1 && canceled2) {
-      reader.cancel([cancelReason1, cancelReason2]);
+    if (canceled1 === true && canceled2 === true) {
+      cancelPromise_resolve(CancelReadableStream(stream, [cancelReason1, cancelReason2]));
     }
   }
 }
diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js
index e98705efa..317c5f3db 100644
--- a/reference-implementation/test/brand-checks.js
+++ b/reference-implementation/test/brand-checks.js
@@ -145,6 +145,12 @@ test('ReadableStream.prototype.pipeTo works generically on its this and its argu
   t.doesNotThrow(() => ReadableStream.prototype.pipeTo.call(fakeReadableStream(), fakeWritableStream()));
 });
 
+test('ReadableStream.prototype.tee enforces a brand check', t => {
+  t.plan(2);
+  methodThrows(t, ReadableStream.prototype, 'tee', fakeReadableStream());
+  methodThrows(t, ReadableStream.prototype, 'tee', realWritableStream());
+});
+
 
 
 
diff --git a/reference-implementation/test/readable-stream-tee.js b/reference-implementation/test/readable-stream-tee.js
new file mode 100644
index 000000000..56d5c06fe
--- /dev/null
+++ b/reference-implementation/test/readable-stream-tee.js
@@ -0,0 +1,170 @@
+const test = require('tape-catch');
+
+import readableStreamToArray from './utils/readable-stream-to-array';
+
+test('ReadableStream teeing: rs.tee() returns an array of two ReadableStreams', t => {
+  const rs = new ReadableStream();
+
+  const result = rs.tee();
+
+  t.ok(Array.isArray(result), 'return value should be an array');
+  t.equal(result.length, 2, 'array should have length 2');
+  t.equal(result[0].constructor, ReadableStream, '0th element should be a ReadableStream');
+  t.equal(result[1].constructor, ReadableStream, '1st element should be a ReadableStream');
+  t.end();
+});
+
+test('ReadableStream teeing: should be able to read one branch to the end without affecting the other', t => {
+  t.plan(5);
+
+  const rs = new ReadableStream({
+    start(enqueue, close) {
+      enqueue('a');
+      enqueue('b');
+      close();
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  const [reader1, reader2] = [branch1.getReader(), branch2.getReader()];
+
+  reader1.closed.then(() => t.pass('branch1 should be closed'));
+  reader2.closed.then(() => t.fail('branch2 should not be closed'));
+
+  reader1.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'first chunk from branch1 should be correct'));
+  reader1.read().then(r => t.deepEqual(r, { value: 'b', done: false }, 'second chunk from branch1 should be correct'));
+  reader1.read().then(r => t.deepEqual(r, { value: undefined, done: true },
+    'third read() from branch1 should be done'));
+
+  reader2.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'first chunk from branch2 should be correct'));
+});
+
+test('ReadableStream teeing: values should be equal across each branch', t => {
+  t.plan(1);
+
+  const theObject = { the: 'test object' };
+  const rs = new ReadableStream({
+    start(enqueue) {
+      enqueue(theObject);
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  const [reader1, reader2] = [branch1.getReader(), branch2.getReader()];
+
+  Promise.all([reader1.read(), reader2.read()]).then(([{ value: value1 }, { value: value2 }]) => {
+    t.equal(value1, value2, 'the values should be equal');
+  });
+});
+
+test('ReadableStream teeing: errors in the source should propagate to both branches', t => {
+  t.plan(6);
+
+  const theError = new Error('boo!');
+  const rs = new ReadableStream({
+    start(enqueue, close, error) {
+      enqueue('a');
+      enqueue('b');
+    },
+    pull() {
+      throw theError;
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  const [reader1, reader2] = [branch1.getReader(), branch2.getReader()];
+
+  reader1.label = 'reader1';
+  reader2.label = 'reader2';
+
+  reader1.closed.catch(e => t.equal(e, theError, 'branch1 closed promise should reject with the error'));
+  reader2.closed.catch(e => t.equal(e, theError, 'branch2 closed promise should reject with the error'));
+
+  reader1.read().then(r => t.deepEqual(r, { value: 'a', done: false },
+    'should be able to read the first chunk in branch1'));
+
+  reader1.read().then(r => {
+    t.deepEqual(r, { value: 'b', done: false }, 'should be able to read the second chunk in branch1');
+
+    return reader2.read().then(
+      () => t.fail('once the root stream has errored, you should not be able to read from branch2'),
+      e => t.equal(e, theError, 'branch2 read() promise should reject with the error')
+    );
+  })
+  .then(() => {
+    return reader1.read().then(
+      () => t.fail('once the root stream has errored, you should not be able to read from branch1 either'),
+      e => t.equal(e, theError, 'branch1 read() promise should reject with the error')
+    );
+  })
+  .catch(e => t.error(e));
+});
+
+test('ReadableStream teeing: canceling branch1 should not impact branch2', t => {
+  t.plan(2);
+
+  const rs = new ReadableStream({
+    start(enqueue, close, error) {
+      enqueue('a');
+      enqueue('b');
+      close();
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  branch1.cancel();
+
+  readableStreamToArray(branch1).then(chunks => t.deepEqual(chunks, [], 'branch1 should have no chunks'));
+  readableStreamToArray(branch2).then(chunks => t.deepEqual(chunks, ['a', 'b'], 'branch2 should have two chunks'));
+});
+
+test('ReadableStream teeing: canceling branch2 should not impact branch1', t => {
+  t.plan(2);
+
+  const rs = new ReadableStream({
+    start(enqueue, close, error) {
+      enqueue('a');
+      enqueue('b');
+      close();
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  branch2.cancel();
+
+  readableStreamToArray(branch1).then(chunks => t.deepEqual(chunks, ['a', 'b'], 'branch1 should have two chunks'));
+  readableStreamToArray(branch2).then(chunks => t.deepEqual(chunks, [], 'branch2 should have no chunks'));
+});
+
+test('ReadableStream teeing: canceling both branches should aggregate the cancel reasons into an array', t => {
+  t.plan(1);
+
+  const reason1 = new Error('We\'re wanted men.');
+  const reason2 = new Error('I have the death sentence on twelve systems.');
+
+  const rs = new ReadableStream({
+    cancel(reason) {
+      t.deepEqual(reason, [reason1, reason2],
+        'the cancel reason should be an array containing those from the branches');
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  branch1.cancel(reason1);
+  branch2.cancel(reason2);
+});
+
+test('ReadableStream teeing: failing to cancel the original stream should cause cancel() to reject on branches', t => {
+  t.plan(2);
+
+  const theError = new Error('I\'ll be careful.');
+  const rs = new ReadableStream({
+    cancel() {
+      throw theError;
+    }
+  });
+
+  const [branch1, branch2] = rs.tee();
+  branch1.cancel().catch(e => t.equal(e, theError, 'branch1.cancel() should reject with the error'));
+  branch2.cancel().catch(e => t.equal(e, theError, 'branch2.cancel() should reject with the error'));
+});
diff --git a/reference-implementation/test/templated/readable-stream-empty.js b/reference-implementation/test/templated/readable-stream-empty.js
index 3c70a4984..b0559ae29 100644
--- a/reference-implementation/test/templated/readable-stream-empty.js
+++ b/reference-implementation/test/templated/readable-stream-empty.js
@@ -12,6 +12,7 @@ export default (label, factory) => {
     t.equal(typeof rs.getReader, 'function', 'has a getReader method');
     t.equal(typeof rs.pipeThrough, 'function', 'has a pipeThrough method');
     t.equal(typeof rs.pipeTo, 'function', 'has a pipeTo method');
+    t.equal(typeof rs.tee, 'function', 'has a tee method');
 
     t.end();
   });

From 43ada8cb11cac12c92f5bcb1423445d00369532b Mon Sep 17 00:00:00 2001
From: Domenic Denicola 
Date: Thu, 19 Mar 2015 22:07:10 +0900
Subject: [PATCH 7/7] Factor out ReadFromReadableStreamReader to avoid using
 public API

---
 .../lib/readable-stream.js                    | 70 ++++++++++---------
 1 file changed, 36 insertions(+), 34 deletions(-)

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 169cbbd38..3d89ec55b 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -212,37 +212,7 @@ class ReadableStreamReader {
         new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
     }
 
-    if (this._state === 'closed') {
-      return Promise.resolve(CreateIterResultObject(undefined, true));
-    }
-
-    if (this._state === 'errored') {
-      return Promise.reject(this._storedError);
-    }
-
-    assert(this._ownerReadableStream !== undefined);
-    assert(this._ownerReadableStream._state === 'readable');
-
-    if (this._ownerReadableStream._queue.length > 0) {
-      const chunk = DequeueValue(this._ownerReadableStream._queue);
-
-      if (this._ownerReadableStream._closeRequested === true && this._ownerReadableStream._queue.length === 0) {
-        CloseReadableStream(this._ownerReadableStream);
-      } else {
-        CallReadableStreamPull(this._ownerReadableStream);
-      }
-
-      return Promise.resolve(CreateIterResultObject(chunk, false));
-    } else {
-      const readRequest = {};
-      readRequest.promise = new Promise((resolve, reject) => {
-        readRequest._resolve = resolve;
-        readRequest._reject = reject;
-      });
-
-      this._readRequests.push(readRequest);
-      return readRequest.promise;
-    }
+    return ReadFromReadableStreamReader(this);
   }
 
   releaseLock() {
@@ -450,6 +420,40 @@ function IsReadableStreamReader(x) {
   return true;
 }
 
+function ReadFromReadableStreamReader(reader) {
+  if (reader._state === 'closed') {
+    return Promise.resolve(CreateIterResultObject(undefined, true));
+  }
+
+  if (reader._state === 'errored') {
+    return Promise.reject(reader._storedError);
+  }
+
+  assert(reader._ownerReadableStream !== undefined);
+  assert(reader._ownerReadableStream._state === 'readable');
+
+  if (reader._ownerReadableStream._queue.length > 0) {
+    const chunk = DequeueValue(reader._ownerReadableStream._queue);
+
+    if (reader._ownerReadableStream._closeRequested === true && reader._ownerReadableStream._queue.length === 0) {
+      CloseReadableStream(reader._ownerReadableStream);
+    } else {
+      CallReadableStreamPull(reader._ownerReadableStream);
+    }
+
+    return Promise.resolve(CreateIterResultObject(chunk, false));
+  } else {
+    const readRequest = {};
+    readRequest.promise = new Promise((resolve, reject) => {
+      readRequest._resolve = resolve;
+      readRequest._reject = reject;
+    });
+
+    reader._readRequests.push(readRequest);
+    return readRequest.promise;
+  }
+}
+
 function ReleaseReadableStreamReader(reader) {
   assert(reader._ownerReadableStream !== undefined);
 
@@ -502,8 +506,6 @@ function ShouldReadableStreamApplyBackpressure(stream) {
 }
 
 function TeeReadableStream(stream, clone) {
-  // TODO: avoid accessing public .read()
-
   assert(IsReadableStream(stream) === true);
   const reader = AcquireReadableStreamReader(stream);
 
@@ -549,7 +551,7 @@ function TeeReadableStream(stream, clone) {
   return [branch1, branch2];
 
   function readAndEnqueueInBoth() {
-    reader.read().then(({ value, done }) => {
+    ReadFromReadableStreamReader(reader).then(({ value, done }) => {
       if (done && !closedOrErrored) {
         branch1._close();
         branch2._close();