diff --git a/index.bs b/index.bs
index f7db4bebe..8b608d3cf 100644
--- a/index.bs
+++ b/index.bs
@@ -270,9 +270,9 @@ Instances of ReadableStream
are created with the internal slots des
closed
getter
ReadableStream
are created with the internal slots des
1. Set *this*@[[underlyingSource]] to _underlyingSource_.
1. Set *this*@[[queue]] to a new empty List.
1. Set *this*@[[state]] to "readable".
- 1. Set *this*@[[started]], *this*@[[draining]], and *this*@[[pullScheduled]] to *false*.
+ 1. Set *this*@[[started]], *this*@[[closeRequested]], and *this*@[[pullScheduled]] to *false*.
1. Set *this*@[[reader]], *this*@[[pullingPromise]], and *this*@[[storedError]] to *undefined*.
1. Set *this*@[[enqueue]] to CreateReadableStreamEnqueueFunction(*this*).
1. Set *this*@[[close]] to CreateReadableStreamCloseFunction(*this*).
@@ -627,7 +627,7 @@ Instances of ReadableStreamReader
are created with the internal slo
1. Assert: *this*@[[ownerReadableStream]]@[[state]] is "readable".
1. If *this*@[[ownerReadableStream]]@[[queue]] is not empty,
1. Let _chunk_ be DequeueValue(*this*@[[ownerReadableStream]]@[[queue]]).
- 1. If *this*@[[ownerReadableStream]]@[[draining]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]).
+ 1. If *this*@[[ownerReadableStream]]@[[closeRequested]] is *true* and *this*@[[ownerReadableStream]]@[[queue]] is now empty, call-with-rethrow CloseReadableStream(*this*@[[ownerReadableStream]]).
1. Otherwise, call-with-rethrow CallReadableStreamPull(*this*@[[ownerReadableStream]]).
1. Return a new promise resolved with CreateIterResultObject(_chunk_, *false*).
1. Otherwise,
@@ -673,7 +673,7 @@ Instances of ReadableStreamReader
are created with the internal slo
- 1. If _stream_@[[draining]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*. + 1. If _stream_@[[closeRequested]] is *true* or _stream_@[[started]] is *false* or _stream_@[[state]] is "closed" or _stream_@[[state]] is "errored" or _stream_@[[pullScheduled]] is *true*, return *undefined*. 1. If _stream_@[[pullingPromise]] is not *undefined*, 1. Set _stream_@[[pullScheduled]] to *true*. 1. Upon fulfillment of _stream_@[[pullingPromise]], @@ -728,11 +728,19 @@ A Readable Stream Close Function is a built-in anonymous function of stream, that performs the following steps:- 1. If _stream_@[[state]] is not "readable", return *undefined*. + 1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception. + 1. If _stream_@[[state]] is "errored", throw a *TypeError* exception. + 1. If _stream_@[[state]] is "closed", return *undefined*. + 1. Set _stream_@[[closeRequested]] to *true*. 1. If _stream_@[[queue]] is empty, return CloseReadableStream(_stream_). - 1. Set _stream_@[[draining]] to *true*.++ The case where stream@\[[state]] is+"closed"
, but stream@\[[closeRequested]] is +false , will happen if the stream was closed without this close function ever being called: i.e., + if the stream was closed by a call tostream.cancel()
. +CreateReadableStreamEnqueueFunction ( stream )
@@ -745,7 +753,7 @@ closing over a variable stream, that performs the following steps:1. If _stream_@[[state]] is "errored", throw _stream_@[[storedError]]. 1. If _stream_@[[state]] is "closed", throw a *TypeError* exception. - 1. If _stream_@[[draining]] is *true*, throw a *TypeError* exception. + 1. If _stream_@[[closeRequested]] is *true*, throw a *TypeError* exception. 1. If IsReadableStreamLocked(_stream_) is *true* and _stream_@[[reader]]@[[readRequests]] is not empty, 1. Let _readRequestPromise_ be the first element of _stream_@[[reader]]@[[readRequests]]. 1. Remove _readRequestPromise_ from _stream_@[[reader]]@[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on). @@ -785,7 +793,7 @@ A Readable Stream Error Function is a built-in anonymous function of a variable stream, that performs the following steps:- 1. If _stream_@[[state]] is not "readable" return *undefined*. + 1. If _stream_@[[state]] is not "readable", throw a *TypeError* exception. 1. Let _stream_@[[queue]] be a new empty List. 1. Set _stream_@[[storedError]] to _e_. 1. Set _stream_@[[state]] to "errored". diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 50f5baced..3d89ec55b 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -8,7 +8,7 @@ export default class ReadableStream { this._queue = []; this._state = 'readable'; this._started = false; - this._draining = false; + this._closeRequested = false; this._pullScheduled = false; this._reader = undefined; this._pullingPromise = undefined; @@ -141,6 +141,14 @@ export default class ReadableStream { rejectPipeToPromise(reason); } } + + tee() { + if (IsReadableStream(this) === false) { + throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream'); + } + + return TeeReadableStream(this, false); + } } class ReadableStreamReader { @@ -204,37 +212,7 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); } - if (this._state === 'closed') { - return Promise.resolve(CreateIterResultObject(undefined, true)); - } - - if (this._state === 'errored') { - return Promise.reject(this._storedError); - } - - assert(this._ownerReadableStream !== undefined); - assert(this._ownerReadableStream._state === 'readable'); - - if (this._ownerReadableStream._queue.length > 0) { - const chunk = DequeueValue(this._ownerReadableStream._queue); - - if (this._ownerReadableStream._draining === true && this._ownerReadableStream._queue.length === 0) { - CloseReadableStream(this._ownerReadableStream); - } else { - CallReadableStreamPull(this._ownerReadableStream); - } - - return Promise.resolve(CreateIterResultObject(chunk, false)); - } else { - const readRequest = {}; - readRequest.promise = new Promise((resolve, reject) => { - readRequest._resolve = resolve; - readRequest._reject = reject; - }); - - this._readRequests.push(readRequest); - return readRequest.promise; - } + return ReadFromReadableStreamReader(this); } releaseLock() { @@ -259,7 +237,7 @@ function AcquireReadableStreamReader(stream) { } function CallReadableStreamPull(stream) { - if (stream._draining === true || stream._started === false || + if (stream._closeRequested === true || stream._started === false || stream._state === 'closed' || stream._state === 'errored' || stream._pullScheduled === true) { return undefined; @@ -317,15 +295,23 @@ function CloseReadableStream(stream) { function CreateReadableStreamCloseFunction(stream) { return () => { - if (stream._state !== 'readable') { + if (stream._closeRequested === true) { + throw new TypeError('The stream has already been closed; do not close it again!'); + } + if (stream._state === 'errored') { + throw new TypeError('The stream is in an errored state and cannot be closed'); + } + + if (stream._state === 'closed') { + // This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel() return undefined; } + stream._closeRequested = true; + if (stream._queue.length === 0) { return CloseReadableStream(stream); } - - stream._draining = true; }; } @@ -339,7 +325,7 @@ function CreateReadableStreamEnqueueFunction(stream) { throw new TypeError('stream is closed'); } - if (stream._draining === true) { + if (stream._closeRequested === true) { throw new TypeError('stream is draining'); } @@ -387,7 +373,7 @@ function CreateReadableStreamEnqueueFunction(stream) { function CreateReadableStreamErrorFunction(stream) { return e => { if (stream._state !== 'readable') { - return; + throw new TypeError(`The stream is ${stream._state} and so cannot be errored`); } stream._queue = []; @@ -434,6 +420,40 @@ function IsReadableStreamReader(x) { return true; } +function ReadFromReadableStreamReader(reader) { + if (reader._state === 'closed') { + return Promise.resolve(CreateIterResultObject(undefined, true)); + } + + if (reader._state === 'errored') { + return Promise.reject(reader._storedError); + } + + assert(reader._ownerReadableStream !== undefined); + assert(reader._ownerReadableStream._state === 'readable'); + + if (reader._ownerReadableStream._queue.length > 0) { + const chunk = DequeueValue(reader._ownerReadableStream._queue); + + if (reader._ownerReadableStream._closeRequested === true && reader._ownerReadableStream._queue.length === 0) { + CloseReadableStream(reader._ownerReadableStream); + } else { + CallReadableStreamPull(reader._ownerReadableStream); + } + + return Promise.resolve(CreateIterResultObject(chunk, false)); + } else { + const readRequest = {}; + readRequest.promise = new Promise((resolve, reject) => { + readRequest._resolve = resolve; + readRequest._reject = reject; + }); + + reader._readRequests.push(readRequest); + return readRequest.promise; + } +} + function ReleaseReadableStreamReader(reader) { assert(reader._ownerReadableStream !== undefined); @@ -484,3 +504,83 @@ function ShouldReadableStreamApplyBackpressure(stream) { return shouldApplyBackpressure; } + +function TeeReadableStream(stream, clone) { + assert(IsReadableStream(stream) === true); + const reader = AcquireReadableStreamReader(stream); + + let canceled1 = false; + let cancelReason1 = undefined; + let canceled2 = false; + let cancelReason2 = undefined; + let closedOrErrored = false; + + let cancelPromise_resolve; + const cancelPromise = new Promise((resolve, reject) => { + cancelPromise_resolve = resolve; + }); + + const branch1 = new ReadableStream({ + pull: readAndEnqueueInBoth, + cancel(reason) { + canceled1 = true; + cancelReason1 = reason; + maybeCancelSource(); + return cancelPromise; + } + }); + + const branch2 = new ReadableStream({ + pull: readAndEnqueueInBoth, + cancel(reason) { + canceled2 = true; + cancelReason2 = reason; + maybeCancelSource(); + return cancelPromise; + } + }); + + reader._closedPromise.catch(e => { + if (!closedOrErrored) { + branch1._error(e); + branch2._error(e); + closedOrErrored = true; + } + }); + + return [branch1, branch2]; + + function readAndEnqueueInBoth() { + ReadFromReadableStreamReader(reader).then(({ value, done }) => { + if (done && !closedOrErrored) { + branch1._close(); + branch2._close(); + closedOrErrored = true; + } + + if (closedOrErrored) { + return; + } + + let value1 = value; + let value2 = value; + if (clone) { + value1 = StructuredClone(value); + value2 = StructuredClone(value); + } + + if (canceled1 === false) { + branch1._enqueue(value1); + } + if (canceled2 === false) { + branch2._enqueue(value2); + } + }); + } + + function maybeCancelSource() { + if (canceled1 === true && canceled2 === true) { + cancelPromise_resolve(CancelReadableStream(stream, [cancelReason1, cancelReason2])); + } + } +} diff --git a/reference-implementation/package.json b/reference-implementation/package.json index bf545b330..c918a8628 100644 --- a/reference-implementation/package.json +++ b/reference-implementation/package.json @@ -35,5 +35,8 @@ "text-table": "^0.2.0", "traceur": "0.0.84", "traceur-runner": "^1.0.2" + }, + "dependencies": { + "cyclonejs": "^1.1.1" } } diff --git a/reference-implementation/test/bad-underlying-sources.js b/reference-implementation/test/bad-underlying-sources.js index c24334ede..3f3e02874 100644 --- a/reference-implementation/test/bad-underlying-sources.js +++ b/reference-implementation/test/bad-underlying-sources.js @@ -321,3 +321,102 @@ test('Underlying source: strategy.size returning +Infinity', t => { rs.getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); + +test('Underlying source: calling close twice on an empty stream should throw the second time', t => { + t.plan(2); + + new ReadableStream({ + start(enqueue, close) { + close(); + t.throws(close, /TypeError/, 'second call to close should throw a TypeError'); + } + }) + .getReader().closed.then(() => t.pass('closed should fulfill')); +}); + +test('Underlying source: calling close twice on a non-empty stream should throw the second time', t => { + t.plan(3); + + const reader = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + close(); + t.throws(close, /TypeError/, 'second call to close should throw a TypeError'); + } + }) + .getReader(); + + reader.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'read() should read the enqueued chunk')); + reader.closed.then(() => t.pass('closed should fulfill')); +}); + +test('Underlying source: calling close on an empty canceled stream should not throw', t => { + t.plan(2); + + let doClose; + const rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + rs.cancel(); + t.doesNotThrow(doClose, 'calling close after canceling should not throw anything'); + + rs.getReader().closed.then(() => t.pass('closed should fulfill')); +}); + +test('Underlying source: calling close on a non-empty canceled stream should not throw', t => { + t.plan(2); + + let doClose; + const rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + doClose = close; + } + }); + + rs.cancel(); + t.doesNotThrow(doClose, 'calling close after canceling should not throw anything'); + + rs.getReader().closed.then(() => t.pass('closed should fulfill')); +}); + +test('Underlying source: calling close after error should throw', t => { + t.plan(2); + + const theError = new Error('boo'); + new ReadableStream({ + start(enqueue, close, error) { + error(theError); + t.throws(close, /TypeError/, 'call to close should throw a TypeError'); + } + }) + .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); +}); + +test('Underlying source: calling error twice should throw the second time', t => { + t.plan(2); + + const theError = new Error('boo'); + new ReadableStream({ + start(enqueue, close, error) { + error(theError); + t.throws(error, /TypeError/, 'second call to error should throw a TypeError'); + } + }) + .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); +}); + +test('Underlying source: calling error after close should throw', t => { + t.plan(2); + + new ReadableStream({ + start(enqueue, close, error) { + close(); + t.throws(error, /TypeError/, 'call to error should throw a TypeError'); + } + }) + .getReader().closed.then(() => t.pass('closed should fulfill')); +}); diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js index e98705efa..317c5f3db 100644 --- a/reference-implementation/test/brand-checks.js +++ b/reference-implementation/test/brand-checks.js @@ -145,6 +145,12 @@ test('ReadableStream.prototype.pipeTo works generically on its this and its argu t.doesNotThrow(() => ReadableStream.prototype.pipeTo.call(fakeReadableStream(), fakeWritableStream())); }); +test('ReadableStream.prototype.tee enforces a brand check', t => { + t.plan(2); + methodThrows(t, ReadableStream.prototype, 'tee', fakeReadableStream()); + methodThrows(t, ReadableStream.prototype, 'tee', realWritableStream()); +}); + diff --git a/reference-implementation/test/readable-stream-tee.js b/reference-implementation/test/readable-stream-tee.js new file mode 100644 index 000000000..56d5c06fe --- /dev/null +++ b/reference-implementation/test/readable-stream-tee.js @@ -0,0 +1,170 @@ +const test = require('tape-catch'); + +import readableStreamToArray from './utils/readable-stream-to-array'; + +test('ReadableStream teeing: rs.tee() returns an array of two ReadableStreams', t => { + const rs = new ReadableStream(); + + const result = rs.tee(); + + t.ok(Array.isArray(result), 'return value should be an array'); + t.equal(result.length, 2, 'array should have length 2'); + t.equal(result[0].constructor, ReadableStream, '0th element should be a ReadableStream'); + t.equal(result[1].constructor, ReadableStream, '1st element should be a ReadableStream'); + t.end(); +}); + +test('ReadableStream teeing: should be able to read one branch to the end without affecting the other', t => { + t.plan(5); + + const rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + close(); + } + }); + + const [branch1, branch2] = rs.tee(); + const [reader1, reader2] = [branch1.getReader(), branch2.getReader()]; + + reader1.closed.then(() => t.pass('branch1 should be closed')); + reader2.closed.then(() => t.fail('branch2 should not be closed')); + + reader1.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'first chunk from branch1 should be correct')); + reader1.read().then(r => t.deepEqual(r, { value: 'b', done: false }, 'second chunk from branch1 should be correct')); + reader1.read().then(r => t.deepEqual(r, { value: undefined, done: true }, + 'third read() from branch1 should be done')); + + reader2.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'first chunk from branch2 should be correct')); +}); + +test('ReadableStream teeing: values should be equal across each branch', t => { + t.plan(1); + + const theObject = { the: 'test object' }; + const rs = new ReadableStream({ + start(enqueue) { + enqueue(theObject); + } + }); + + const [branch1, branch2] = rs.tee(); + const [reader1, reader2] = [branch1.getReader(), branch2.getReader()]; + + Promise.all([reader1.read(), reader2.read()]).then(([{ value: value1 }, { value: value2 }]) => { + t.equal(value1, value2, 'the values should be equal'); + }); +}); + +test('ReadableStream teeing: errors in the source should propagate to both branches', t => { + t.plan(6); + + const theError = new Error('boo!'); + const rs = new ReadableStream({ + start(enqueue, close, error) { + enqueue('a'); + enqueue('b'); + }, + pull() { + throw theError; + } + }); + + const [branch1, branch2] = rs.tee(); + const [reader1, reader2] = [branch1.getReader(), branch2.getReader()]; + + reader1.label = 'reader1'; + reader2.label = 'reader2'; + + reader1.closed.catch(e => t.equal(e, theError, 'branch1 closed promise should reject with the error')); + reader2.closed.catch(e => t.equal(e, theError, 'branch2 closed promise should reject with the error')); + + reader1.read().then(r => t.deepEqual(r, { value: 'a', done: false }, + 'should be able to read the first chunk in branch1')); + + reader1.read().then(r => { + t.deepEqual(r, { value: 'b', done: false }, 'should be able to read the second chunk in branch1'); + + return reader2.read().then( + () => t.fail('once the root stream has errored, you should not be able to read from branch2'), + e => t.equal(e, theError, 'branch2 read() promise should reject with the error') + ); + }) + .then(() => { + return reader1.read().then( + () => t.fail('once the root stream has errored, you should not be able to read from branch1 either'), + e => t.equal(e, theError, 'branch1 read() promise should reject with the error') + ); + }) + .catch(e => t.error(e)); +}); + +test('ReadableStream teeing: canceling branch1 should not impact branch2', t => { + t.plan(2); + + const rs = new ReadableStream({ + start(enqueue, close, error) { + enqueue('a'); + enqueue('b'); + close(); + } + }); + + const [branch1, branch2] = rs.tee(); + branch1.cancel(); + + readableStreamToArray(branch1).then(chunks => t.deepEqual(chunks, [], 'branch1 should have no chunks')); + readableStreamToArray(branch2).then(chunks => t.deepEqual(chunks, ['a', 'b'], 'branch2 should have two chunks')); +}); + +test('ReadableStream teeing: canceling branch2 should not impact branch1', t => { + t.plan(2); + + const rs = new ReadableStream({ + start(enqueue, close, error) { + enqueue('a'); + enqueue('b'); + close(); + } + }); + + const [branch1, branch2] = rs.tee(); + branch2.cancel(); + + readableStreamToArray(branch1).then(chunks => t.deepEqual(chunks, ['a', 'b'], 'branch1 should have two chunks')); + readableStreamToArray(branch2).then(chunks => t.deepEqual(chunks, [], 'branch2 should have no chunks')); +}); + +test('ReadableStream teeing: canceling both branches should aggregate the cancel reasons into an array', t => { + t.plan(1); + + const reason1 = new Error('We\'re wanted men.'); + const reason2 = new Error('I have the death sentence on twelve systems.'); + + const rs = new ReadableStream({ + cancel(reason) { + t.deepEqual(reason, [reason1, reason2], + 'the cancel reason should be an array containing those from the branches'); + } + }); + + const [branch1, branch2] = rs.tee(); + branch1.cancel(reason1); + branch2.cancel(reason2); +}); + +test('ReadableStream teeing: failing to cancel the original stream should cause cancel() to reject on branches', t => { + t.plan(2); + + const theError = new Error('I\'ll be careful.'); + const rs = new ReadableStream({ + cancel() { + throw theError; + } + }); + + const [branch1, branch2] = rs.tee(); + branch1.cancel().catch(e => t.equal(e, theError, 'branch1.cancel() should reject with the error')); + branch2.cancel().catch(e => t.equal(e, theError, 'branch2.cancel() should reject with the error')); +}); diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index ca256790a..a1fe45925 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -38,57 +38,6 @@ test('ReadableStream: if pull rejects, it should error the stream', t => { }); }); -test('ReadableStream: calling close twice should be a no-op', t => { - t.plan(2); - - new ReadableStream({ - start(enqueue, close) { - close(); - t.doesNotThrow(close); - } - }) - .getReader().closed.then(() => t.pass('closed should fulfill')); -}); - -test('ReadableStream: calling error twice should be a no-op', t => { - t.plan(2); - - const theError = new Error('boo!'); - const error2 = new Error('not me!'); - new ReadableStream({ - start(enqueue, close, error) { - error(theError); - t.doesNotThrow(() => error(error2)); - } - }) - .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the first error')); -}); - -test('ReadableStream: calling error after close should be a no-op', t => { - t.plan(2); - - new ReadableStream({ - start(enqueue, close, error) { - close(); - t.doesNotThrow(error); - } - }) - .getReader().closed.then(() => t.pass('closed should fulfill')); -}); - -test('ReadableStream: calling close after error should be a no-op', t => { - t.plan(2); - - const theError = new Error('boo!'); - new ReadableStream({ - start(enqueue, close, error) { - error(theError); - t.doesNotThrow(close); - } - }) - .getReader().closed.catch(e => t.equal(e, theError, 'closed should reject with the first error')); -}); - test('ReadableStream: should only call pull once upon starting the stream', t => { t.plan(2); diff --git a/reference-implementation/test/templated/readable-stream-empty.js b/reference-implementation/test/templated/readable-stream-empty.js index 3c70a4984..b0559ae29 100644 --- a/reference-implementation/test/templated/readable-stream-empty.js +++ b/reference-implementation/test/templated/readable-stream-empty.js @@ -12,6 +12,7 @@ export default (label, factory) => { t.equal(typeof rs.getReader, 'function', 'has a getReader method'); t.equal(typeof rs.pipeThrough, 'function', 'has a pipeThrough method'); t.equal(typeof rs.pipeTo, 'function', 'has a pipeTo method'); + t.equal(typeof rs.tee, 'function', 'has a tee method'); t.end(); });