From 1a5c51beb2f36d26dcd6ec8ae60ba88c84379b41 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Mon, 9 Mar 2015 15:18:09 +0900 Subject: [PATCH] Second pass at async read design Based on discussions in #253. The key differences here from the previous async read() commits are: - ReadableStreams no longer have read() methods directly; those only exist on readers. This drastically simplifies the stream/reader interaction, and also allows the possibility of different types of readers which have different reading behavior. - read() promises fulfill with { value, done } instead of using an EOS sentinel value. This avoids a number of problems, and also provides a mechanism by which readable byte streams can smuggle out "unused" buffers given to them (using { value: zeroLengthViewOntoBuffer, done: true }). Another new semantic worth mentioning is that you cannot release a reader if the reader has read()s pending; doing so will throw. This slightly complicates the pipe algorithm in the { preventCancel: true } case. This commit also adds some new infrastructure for _templated tests_, and ports some portion of the existing tests there. This is our solution for #217 and #264. Finally, we re-merge all related code into a single readable-stream.js file, as the setup with the three separate files (readable-stream.js, exclusive-stream-reader.js, and readable-stream-abstract-ops.js) was problematic in causing circular dependencies. --- .../lib/readable-stream-abstract-ops.js | 233 ------ .../lib/readable-stream.js | 453 +++++++++-- reference-implementation/run-tests.js | 28 +- .../test/bad-underlying-sources.js | 40 +- reference-implementation/test/brand-checks.js | 77 +- .../test/count-queuing-strategy.js | 109 +-- reference-implementation/test/pipe-through.js | 3 +- .../test/pipe-to-options.js | 94 +-- reference-implementation/test/pipe-to.js | 179 +++-- .../test/readable-stream-cancel.js | 100 +-- .../test/readable-stream-reader.js | 208 +++++ .../test/readable-stream-templated.js | 117 +++ .../test/readable-stream.js | 720 ++++++++---------- .../readable-stream-closed-reader.js | 50 ++ .../test/templated/readable-stream-closed.js | 39 + .../templated/readable-stream-empty-reader.js | 134 ++++ .../test/templated/readable-stream-empty.js | 21 + .../readable-stream-errored-reader.js | 32 + .../readable-stream-errored-sync-only.js | 29 + .../test/templated/readable-stream-errored.js | 17 + ...eadable-stream-two-chunks-closed-reader.js | 112 +++ .../readable-stream-two-chunks-open-reader.js | 52 ++ .../test/transform-stream-errors.js | 45 +- .../test/transform-stream.js | 129 ++-- .../test/utils/readable-stream-to-array.js | 8 +- 25 files changed, 1861 insertions(+), 1168 deletions(-) delete mode 100644 reference-implementation/lib/readable-stream-abstract-ops.js create mode 100644 reference-implementation/test/readable-stream-reader.js create mode 100644 reference-implementation/test/readable-stream-templated.js create mode 100644 reference-implementation/test/templated/readable-stream-closed-reader.js create mode 100644 reference-implementation/test/templated/readable-stream-closed.js create mode 100644 reference-implementation/test/templated/readable-stream-empty-reader.js create mode 100644 reference-implementation/test/templated/readable-stream-empty.js create mode 100644 reference-implementation/test/templated/readable-stream-errored-reader.js create mode 100644 reference-implementation/test/templated/readable-stream-errored-sync-only.js create mode 100644 reference-implementation/test/templated/readable-stream-errored.js create mode 100644 reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js create mode 100644 reference-implementation/test/templated/readable-stream-two-chunks-open-reader.js diff --git a/reference-implementation/lib/readable-stream-abstract-ops.js b/reference-implementation/lib/readable-stream-abstract-ops.js deleted file mode 100644 index eb3b56d7f..000000000 --- a/reference-implementation/lib/readable-stream-abstract-ops.js +++ /dev/null @@ -1,233 +0,0 @@ -const assert = require('assert'); -import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes'; -import { PromiseInvokeOrNoop, typeIsObject } from './helpers'; - -export const ReadableStreamEOS = Symbol('ReadableStream.EOS'); - -export function CallReadableStreamPull(stream) { - if (stream._draining === true || stream._started === false || - stream._state === 'closed' || stream._state === 'errored' || - stream._pullScheduled === true) { - return undefined; - } - - if (stream._pullingPromise !== undefined) { - stream._pullScheduled = true; - stream._pullingPromise.then(() => { - stream._pullScheduled = false; - CallReadableStreamPull(stream); - }); - return undefined; - } - - const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); - if (shouldApplyBackpressure === true) { - return undefined; - } - - stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]); - stream._pullingPromise.then( - () => { stream._pullingPromise = undefined; }, - e => { stream._error(e); } - ); - - return undefined; -} - -export function CancelReadableStream(stream, reason) { - if (stream._state === 'closed' || stream._state === 'errored') { - return stream._closedPromise; - } - - stream._queue = []; - CloseReadableStream(stream); - - const sourceCancelPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'cancel', [reason]); - return sourceCancelPromise.then(() => undefined); -} - -function CloseReadableStream(stream) { - if (stream._readPromisePending === true) { - stream._resolveReadPromise(ReadableStreamEOS); - stream._readPromisePending = false; - } - - stream._resolveClosedPromise(undefined); - - stream._state = 'closed'; - - return undefined; -} - -export function CreateReadableStreamCloseFunction(stream) { - return () => { - if (stream._state === 'readable') { - // TODO: refactor draining to a 'close' readRecord, like WritableStream uses!? - if (stream._queue.length === 0) { - CloseReadableStream(stream); - } else { - stream._draining = true; - } - } - }; -} - -export function CreateReadableStreamEnqueueFunction(stream) { - return chunk => { - if (stream._state === 'errored') { - throw stream._storedError; - } - - if (stream._state === 'closed') { - throw new TypeError('stream is closed'); - } - - if (stream._draining === true) { - throw new TypeError('stream is draining'); - } - - EnqueueIntoReadableStream(stream, chunk); - - const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); - if (shouldApplyBackpressure === true) { - return false; - } - return true; - }; -} - -export function CreateReadableStreamErrorFunction(stream) { - return e => { - if (stream._state === 'closed' || stream._state === 'errored') { - return; - } - - assert(stream._state === 'readable', `stream state ${stream._state} is invalid`); - - stream._queue = []; - - if (stream._readPromisePending === true) { - stream._rejectReadPromise(e); - } - - stream._rejectClosedPromise(e); - - stream._storedError = e; - stream._state = 'errored'; - - return undefined; - }; -} - -function EnqueueIntoReadableStream(stream, chunk) { - if (stream._readPromisePending === true) { - // read() was called and not yet fulfilled; we can skip the queue and put the chunk there - assert(stream._readPromise !== undefined); - - stream._resolveReadPromise(chunk); - stream._readPromisePending = false; - - // Don't forget to (possibly) call pull, even though the queue size doesn't change - stream._readPromise.then(() => CallReadableStreamPull(stream)); - return undefined; - } - - let chunkSize = 1; - - let strategy; - try { - strategy = stream._underlyingSource.strategy; - } catch (strategyE) { - stream._error(strategyE); - throw strategyE; - } - - if (strategy !== undefined) { - try { - chunkSize = strategy.size(chunk); - } catch (chunkSizeE) { - stream._error(chunkSizeE); - throw chunkSizeE; - } - } - - try { - EnqueueValueWithSize(stream._queue, chunk, chunkSize); - } catch (enqueueE) { - stream._error(enqueueE); - throw enqueueE; - } -} - -export function IsReadableStream(x) { - if (!typeIsObject(x)) { - return false; - } - - if (!Object.prototype.hasOwnProperty.call(x, '_underlyingSource')) { - return false; - } - - return true; -} - -export function ReadFromReadableStream(stream) { - if (stream._state === 'errored') { - return Promise.reject(stream._storedError); - } - - if (stream._state === 'closed') { - return Promise.resolve(ReadableStreamEOS); - } - - assert(stream._state === 'readable', `stream state ${stream._state} is invalid`); - - if (stream._readPromise !== undefined) { - return Promise.reject(new TypeError('A concurrent read is already in progress for this stream')); - } - - stream._initReadPromise(); - stream._readPromisePending = true; - - if (stream._queue.length > 0) { - const chunk = DequeueValue(stream._queue); - stream._resolveReadPromise(chunk); - stream._readPromisePending = false; - - if (stream._queue.length === 0 && stream._draining === true) { - CloseReadableStream(stream); - } - } - - CallReadableStreamPull(stream); - - stream._readPromise.then(() => { - stream._readPromise = undefined; - }); - - return stream._readPromise; -} - -export function ShouldReadableStreamApplyBackpressure(stream) { - const queueSize = GetTotalQueueSize(stream._queue); - let shouldApplyBackpressure = queueSize > 1; - - let strategy; - try { - strategy = stream._underlyingSource.strategy; - } catch (strategyE) { - stream._error(strategyE); - throw strategyE; - } - - if (strategy !== undefined) { - try { - shouldApplyBackpressure = Boolean(strategy.shouldApplyBackpressure(queueSize)); - } catch (shouldApplyBackpressureE) { - stream._error(shouldApplyBackpressureE); - throw shouldApplyBackpressureE; - } - } - - return shouldApplyBackpressure; -} diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index d3ee9a9fc..0e06bb098 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -1,37 +1,25 @@ const assert = require('assert'); -import * as helpers from './helpers'; -import { AcquireExclusiveStreamReader, CallReadableStreamPull, CancelReadableStream, CreateReadableStreamCloseFunction, - CreateReadableStreamEnqueueFunction, CreateReadableStreamErrorFunction, IsReadableStream, ReadableStreamEOS, - ReadFromReadableStream, ShouldReadableStreamApplyBackpressure } from './readable-stream-abstract-ops'; +import { InvokeOrNoop, PromiseInvokeOrNoop, typeIsObject } from './helpers'; +import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes'; export default class ReadableStream { constructor(underlyingSource = {}) { this._underlyingSource = underlyingSource; - this._readPromise = undefined; - this._initClosedPromise(); - this._queue = []; this._state = 'readable'; + this._initClosedPromise(); + this._storedError = undefined; this._started = false; this._draining = false; - this._readPromisePending = false; this._pullScheduled = false; this._pullingPromise = undefined; this._readableStreamReader = undefined; - - // readPromise: created and returned by read() if read() is called in a readable state. - // resets to undefined in a .then() after it fulfills. - - // _readPromisePending: when enqueueing, we can do one of two things: - // - if _readPromisePending is true, resolve readPromise with the chunk, skipping the queue - // - if _readPromisePending is false, go to the queue - // it gets set *synchronously* upon resolving or rejecting readPromise, whereas readPromise itself gets cleared - // *asynchronously* (after a .then()) + this._queue = []; this._enqueue = CreateReadableStreamEnqueueFunction(this); this._close = CreateReadableStreamCloseFunction(this); this._error = CreateReadableStreamErrorFunction(this); - const startResult = helpers.InvokeOrNoop(underlyingSource, 'start', [this._enqueue, this._close, this._error]); + const startResult = InvokeOrNoop(underlyingSource, 'start', [this._enqueue, this._close, this._error]); Promise.resolve(startResult).then( () => { this._started = true; @@ -49,14 +37,6 @@ export default class ReadableStream { return this._closedPromise; } - get state() { - if (!IsReadableStream(this)) { - throw new TypeError('ReadableStream.prototype.state can only be used on a ReadableStream'); - } - - return this._state; - } - cancel(reason) { if (!IsReadableStream(this)) { return Promise.reject(new TypeError('ReadableStream.prototype.cancel can only be used on a ReadableStream')); @@ -65,15 +45,15 @@ export default class ReadableStream { return CancelReadableStream(this, reason); } - pipeThrough({ writable, readable }, options) { - if (!helpers.typeIsObject(writable)) { - throw new TypeError('A transform stream must have an writable property that is an object.'); + getReader() { + if (!IsReadableStream(this)) { + throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream'); } - if (!helpers.typeIsObject(readable)) { - throw new TypeError('A transform stream must have a readable property that is an object.'); - } + return AcquireReadableStreamReader(this); + } + pipeThrough({ writable, readable }, options) { this.pipeTo(writable, options); return readable; } @@ -84,7 +64,9 @@ export default class ReadableStream { preventCancel = Boolean(preventCancel); const source = this; - const EOS = source.constructor.EOS; + + let reader; + let lastRead; let closedPurposefully = false; let resolvePipeToPromise; let rejectPipeToPromise; @@ -93,6 +75,8 @@ export default class ReadableStream { resolvePipeToPromise = resolve; rejectPipeToPromise = reject; + reader = source.getReader(); + source.closed.catch(abortDest); dest.closed.then( () => { @@ -107,11 +91,12 @@ export default class ReadableStream { }); function doPipe() { - Promise.all([source.read(), dest.ready]).then(([chunk]) => { - if (chunk === EOS) { + lastRead = reader.read(); + Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => { + if (Boolean(done) === true) { closeDest(); } else { - dest.write(chunk); + dest.write(value); doPipe(); } }); @@ -121,14 +106,26 @@ export default class ReadableStream { } function cancelSource(reason) { - const sourceState = source.state; - if (preventCancel === false && sourceState === 'readable') { + if (preventCancel === false) { + // cancelling automatically releases the lock (and that doesn't fail, since source is then closed) source.cancel(reason); + rejectPipeToPromise(reason); + } else { + // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. + // We don't need to handle lastRead failing because that will trigger abortDest which takes care of + // both of these. + lastRead.then(() => { + reader.releaseLock(); + rejectPipeToPromise(reason); + }); } - rejectPipeToPromise(reason); } function closeDest() { + // Does not need to wait for lastRead since it occurs only on source closed. + + reader.releaseLock(); + const destState = dest.state; if (preventClose === false && (destState === 'waiting' || destState === 'writable')) { closedPurposefully = true; @@ -139,6 +136,10 @@ export default class ReadableStream { } function abortDest(reason) { + // Does not need to wait for lastRead since it only occurs on source errored. + + reader.releaseLock(); + if (preventAbort === false) { dest.abort(reason); } @@ -146,21 +147,10 @@ export default class ReadableStream { } } - read() { - if (!IsReadableStream(this)) { - return Promise.reject(new TypeError('ReadableStream.prototype.read can only be used on a ReadableStream')); - } - return ReadFromReadableStream(this); - } - - - _initReadPromise() { - this._readPromise = new Promise((resolve, reject) => { - this._readPromise_resolve = resolve; - this._readPromise_reject = reject; - }); - } + // Note: The resolve function and reject function are cleared when the corresponding promise is resolved or rejected. + // This is for debugging. This makes extra resolve/reject calls for the same promise fail so that we can detect + // unexpected extra resolve/reject calls that may be caused by bugs in the algorithm. _initClosedPromise() { this._closedPromise = new Promise((resolve, reject) => { @@ -169,24 +159,6 @@ export default class ReadableStream { }); } - // Note: The resolve function and reject function are cleared when the - // corresponding promise is resolved or rejected. This is for debugging. This - // makes extra resolve/reject calls for the same promise fail so that we can - // detect unexpected extra resolve/reject calls that may be caused by bugs in - // the algorithm. - - _resolveReadPromise(value) { - this._readPromise_resolve(value); - this._readPromise_resolve = null; - this._readPromise_reject = null; - } - - _rejectReadPromise(reason) { - this._readPromise_reject(reason); - this._readPromise_resolve = null; - this._readPromise_reject = null; - } - _resolveClosedPromise(value) { this._closedPromise_resolve(value); this._closedPromise_resolve = null; @@ -198,11 +170,338 @@ export default class ReadableStream { this._closedPromise_resolve = null; this._closedPromise_reject = null; } +}; + +class ReadableStreamReader { + constructor(stream) { + if (!IsReadableStream(stream)) { + throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance'); + } + if (stream._state === 'closed') { + throw new TypeError('The stream has already been closed, so a reader cannot be acquired'); + } + if (stream._state === 'errored') { + throw stream._storedError; + } + if (IsReadableStreamLocked(stream)) { + throw new TypeError('This stream has already been locked for exclusive reading by another reader'); + } + + this._readRequests = []; + + this._closedPromise = new Promise((resolve, reject) => { + this._closedPromise_resolve = resolve; + this._closedPromise_reject = reject; + }); + + stream._readableStreamReader = this; + this._encapsulatedReadableStream = stream; + } + + get closed() { + if (!IsReadableStreamReader(this)) { + return Promise.reject( + new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader')); + } + + return this._closedPromise; + } + + get isActive() { + if (!IsReadableStreamReader(this)) { + throw new TypeError('ReadableStreamReader.prototype.isActive can only be used on a ReadableStreamReader'); + } + + return this._encapsulatedReadableStream !== undefined; + } + + cancel(reason) { + if (!IsReadableStreamReader(this)) { + return Promise.reject( + new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); + } + + if (this._encapsulatedReadableStream === undefined) { + return Promise.resolve(undefined); + } + + return CancelReadableStream(this._encapsulatedReadableStream, reason); + } + + read() { + if (!IsReadableStreamReader(this)) { + return Promise.reject( + new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); + } + + if (this._encapsulatedReadableStream === undefined || this._encapsulatedReadableStream._state === 'closed') { + return Promise.resolve({ value: undefined, done: true }); + } + + if (this._encapsulatedReadableStream._state === 'errored') { + return Promise.reject(this._encapsulatedReadableStream._storedError); + } + + if (this._encapsulatedReadableStream._queue.length > 0) { + const chunk = DequeueValue(this._encapsulatedReadableStream._queue); + + if (this._encapsulatedReadableStream._draining === true && this._encapsulatedReadableStream._queue.length === 0) { + CloseReadableStream(this._encapsulatedReadableStream); + } else { + CallReadableStreamPull(this._encapsulatedReadableStream); + } + + return Promise.resolve({ value: chunk, done: false }); + } else { + const readRequest = {}; + readRequest.promise = new Promise((resolve, reject) => { + readRequest._resolve = resolve; + readRequest._reject = reject; + }); + + this._readRequests.push(readRequest); + return readRequest.promise; + } + } + + releaseLock() { + if (!IsReadableStreamReader(this)) { + throw new TypeError('ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader'); + } + + if (this._encapsulatedReadableStream === undefined) { + return undefined; + } + + if (this._readRequests.length > 0) { + throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); + } + + ReleaseReadableStreamReader(this); + } +} + +function AcquireReadableStreamReader(stream) { + return new ReadableStreamReader(stream); +} + +function CallReadableStreamPull(stream) { + if (stream._draining === true || stream._started === false || + stream._state === 'closed' || stream._state === 'errored' || + stream._pullScheduled === true) { + return undefined; + } + + if (stream._pullingPromise !== undefined) { + stream._pullScheduled = true; + stream._pullingPromise.then(() => { + stream._pullScheduled = false; + CallReadableStreamPull(stream); + }); + return undefined; + } + + const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); + if (shouldApplyBackpressure === true) { + return undefined; + } + + stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]); + stream._pullingPromise.then( + () => { stream._pullingPromise = undefined; }, + e => { stream._error(e); } + ); + + return undefined; +} + +function CancelReadableStream(stream, reason) { + if (stream._state === 'closed') { + return Promise.resolve(undefined); + } + if (stream._state === 'errored') { + return Promise.reject(stream._storedError); + } + + stream._queue = []; + CloseReadableStream(stream); + + const sourceCancelPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'cancel', [reason]); + return sourceCancelPromise.then(() => undefined); +} + +function CloseReadableStream(stream) { + assert(stream._state === 'readable'); + + stream._state = 'closed'; + stream._resolveClosedPromise(undefined); + + if (stream._readableStreamReader !== undefined) { + ReleaseReadableStreamReader(stream._readableStreamReader); + } + + return undefined; +} + +function CreateReadableStreamCloseFunction(stream) { + return () => { + if (stream._state !== 'readable') { + return; + } + + if (stream._queue.length === 0) { + CloseReadableStream(stream); + } else { + stream._draining = true; + } + }; +} + +function CreateReadableStreamEnqueueFunction(stream) { + return chunk => { + if (stream._state === 'errored') { + throw stream._storedError; + } + + if (stream._state === 'closed') { + throw new TypeError('stream is closed'); + } + + if (stream._draining === true) { + throw new TypeError('stream is draining'); + } + + if (stream._readableStreamReader !== undefined && stream._readableStreamReader._readRequests.length > 0) { + const readRequest = stream._readableStreamReader._readRequests.shift(); + readRequest._resolve({ value: chunk, done: false }); + } else { + let chunkSize = 1; + + let strategy; + try { + strategy = stream._underlyingSource.strategy; + } catch (strategyE) { + stream._error(strategyE); + throw strategyE; + } + + if (strategy !== undefined) { + try { + chunkSize = strategy.size(chunk); + } catch (chunkSizeE) { + stream._error(chunkSizeE); + throw chunkSizeE; + } + } + + try { + EnqueueValueWithSize(stream._queue, chunk, chunkSize); + } catch (enqueueE) { + stream._error(enqueueE); + throw enqueueE; + } + } + + CallReadableStreamPull(stream); + + const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); + if (shouldApplyBackpressure === true) { + return false; + } + return true; + }; +} + +function CreateReadableStreamErrorFunction(stream) { + return e => { + if (stream._state !== 'readable') { + return; + } + + stream._state = 'errored'; + stream._storedError = e; + stream._rejectClosedPromise(e); + + if (stream._readableStreamReader !== undefined) { + stream._readableStreamReader._closedPromise_reject(e); + + for (const { _reject } of stream._readableStreamReader._readRequests) { + _reject(e); + } + stream._readableStreamReader._readRequests = []; + } + + stream._queue = []; + }; +} + +function IsReadableStream(x) { + if (!typeIsObject(x)) { + return false; + } + + if (!Object.prototype.hasOwnProperty.call(x, '_underlyingSource')) { + return false; + } + + return true; +} + +function IsReadableStreamLocked(stream) { + assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams'); + + if (stream._readableStreamReader === undefined) { + return false; + } + + return true; +} + +function IsReadableStreamReader(x) { + if (!typeIsObject(x)) { + return false; + } + + if (!Object.prototype.hasOwnProperty.call(x, '_encapsulatedReadableStream')) { + return false; + } + + return true; +} + +function ReleaseReadableStreamReader(reader) { + assert(reader._encapsulatedReadableStream !== undefined); + + for (const { _resolve } of reader._readRequests) { + _resolve({ value: undefined, done: true }); + } + reader._readRequests = []; + + reader._encapsulatedReadableStream._readableStreamReader = undefined; + reader._encapsulatedReadableStream = undefined; + reader._closedPromise_resolve(undefined); } -Object.defineProperty(ReadableStream, 'EOS', { - value: ReadableStreamEOS, - enumerable: false, - configurable: false, - writable: false -}); +function ShouldReadableStreamApplyBackpressure(stream) { + const queueSize = GetTotalQueueSize(stream._queue); + let shouldApplyBackpressure = queueSize > 1; + + let strategy; + try { + strategy = stream._underlyingSource.strategy; + } catch (strategyE) { + stream._error(strategyE); + throw strategyE; + } + + if (strategy !== undefined) { + try { + shouldApplyBackpressure = Boolean(strategy.shouldApplyBackpressure(queueSize)); + } catch (shouldApplyBackpressureE) { + stream._error(shouldApplyBackpressureE); + throw shouldApplyBackpressureE; + } + } + + return shouldApplyBackpressure; +} diff --git a/reference-implementation/run-tests.js b/reference-implementation/run-tests.js index c6befec97..7a707b7f4 100644 --- a/reference-implementation/run-tests.js +++ b/reference-implementation/run-tests.js @@ -1,5 +1,6 @@ const glob = require('glob'); const path = require('path'); +const tape = require('tape-catch'); import ReadableStream from './lib/readable-stream'; import WritableStream from './lib/writable-stream'; @@ -15,13 +16,32 @@ global.ByteLengthQueuingStrategy = ByteLengthQueuingStrategy; global.CountQueuingStrategy = CountQueuingStrategy; global.TransformStream = TransformStream; +tape.Test.prototype.throwsExactly = function (fn, expected, message, extra) { + // Based on https://github.com/substack/tape/blob/8835f51c6c4a8d45c5a415def9e25a08c144a478/lib/test.js#L424 + let caught = undefined; + try { + fn(); + } catch (error) { + caught = { error }; + } + + const passed = caught && caught.error === expected; + this._assert(passed, { + message: message !== undefined ? message : 'should throw exactly', + operator: 'throws exactly', + actual: caught && caught.error, + expected: expected, + error: !passed && caught && caught.error, + extra: extra + }); +}; if (process.argv.length === 3) { - const tests = glob.sync(path.resolve(__dirname, 'test/*.js')); + const tests = glob.sync(path.resolve(__dirname, 'test/*.js')); - // disable experimental tests while we figure out impact of async read on ReadableByteStream - const experimentalTests = []; // glob.sync(path.resolve(__dirname, 'test/experimental/*.js')); - tests.concat(experimentalTests).forEach(require); + // disable experimental tests while we figure out impact of async read on ReadableByteStream + const experimentalTests = []; // glob.sync(path.resolve(__dirname, 'test/experimental/*.js')); + tests.concat(experimentalTests).forEach(require); } else { glob.sync(path.resolve(process.argv[3])).forEach(require); } diff --git a/reference-implementation/test/bad-underlying-sources.js b/reference-implementation/test/bad-underlying-sources.js index ea18834fe..27562d725 100644 --- a/reference-implementation/test/bad-underlying-sources.js +++ b/reference-implementation/test/bad-underlying-sources.js @@ -59,7 +59,7 @@ test('Underlying source: throwing pull method (initial pull)', t => { }); test('Underlying source: throwing pull getter (second pull)', t => { - t.plan(4); + t.plan(2); const theError = new Error('a unique string'); let counter = 0; @@ -74,12 +74,7 @@ test('Underlying source: throwing pull getter (second pull)', t => { } }); - t.equal(rs.state, 'readable', 'the stream should start readable'); - - rs.read().then(v => { - t.equal(rs.state, 'readable', 'the stream should not be errored on the first read'); - t.equal(v, 'a', 'the chunk read should be correct'); - }); + rs.getReader().read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'the chunk read should be correct')); rs.closed.then( () => t.fail('closed should not fulfill'), @@ -88,7 +83,7 @@ test('Underlying source: throwing pull getter (second pull)', t => { }); test('Underlying source: throwing pull method (second pull)', t => { - t.plan(4); + t.plan(2); const theError = new Error('a unique string'); let counter = 0; @@ -103,12 +98,7 @@ test('Underlying source: throwing pull method (second pull)', t => { } }); - t.equal(rs.state, 'readable', 'the stream should start readable'); - - rs.read().then(v => { - t.equal(rs.state, 'readable', 'the stream should not be errored on the first read'); - t.equal(v, 'a', 'the chunk read should be correct'); - }); + rs.getReader().read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'the chunk read should be correct')); rs.closed.then( () => t.fail('closed should not fulfill'), @@ -162,7 +152,7 @@ test('Underlying source: throwing strategy getter', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: throwing strategy.size getter', t => { @@ -183,7 +173,7 @@ test('Underlying source: throwing strategy.size getter', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: throwing strategy.size method', t => { @@ -204,7 +194,7 @@ test('Underlying source: throwing strategy.size method', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: throwing strategy.shouldApplyBackpressure getter', t => { @@ -225,7 +215,7 @@ test('Underlying source: throwing strategy.shouldApplyBackpressure getter', t => } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: throwing strategy.shouldApplyBackpressure method', t => { @@ -246,12 +236,13 @@ test('Underlying source: throwing strategy.shouldApplyBackpressure method', t => } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: strategy.size returning NaN', t => { t.plan(2); + let theError; const rs = new ReadableStream({ start(enqueue) { try { @@ -259,6 +250,7 @@ test('Underlying source: strategy.size returning NaN', t => { t.fail('enqueue didn\'t throw'); } catch (error) { t.equal(error.constructor, RangeError, 'enqueue should throw a RangeError'); + theError = error; } }, strategy: { @@ -271,12 +263,13 @@ test('Underlying source: strategy.size returning NaN', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: strategy.size returning -Infinity', t => { t.plan(2); + let theError; const rs = new ReadableStream({ start(enqueue) { try { @@ -284,6 +277,7 @@ test('Underlying source: strategy.size returning -Infinity', t => { t.fail('enqueue didn\'t throw'); } catch (error) { t.equal(error.constructor, RangeError, 'enqueue should throw a RangeError'); + theError = error; } }, strategy: { @@ -296,12 +290,13 @@ test('Underlying source: strategy.size returning -Infinity', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); test('Underlying source: strategy.size returning +Infinity', t => { t.plan(2); + let theError; const rs = new ReadableStream({ start(enqueue) { try { @@ -309,6 +304,7 @@ test('Underlying source: strategy.size returning +Infinity', t => { t.fail('enqueue didn\'t throw'); } catch (error) { t.equal(error.constructor, RangeError, 'enqueue should throw a RangeError'); + theError = error; } }, strategy: { @@ -321,5 +317,5 @@ test('Underlying source: strategy.size returning +Infinity', t => { } }); - t.equal(rs.state, 'errored', 'state should be errored'); + rs.closed.catch(e => t.equal(e, theError, 'closed should reject with the error')); }); diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js index 3a9aaaa48..f94ab50c6 100644 --- a/reference-implementation/test/brand-checks.js +++ b/reference-implementation/test/brand-checks.js @@ -1,16 +1,22 @@ const test = require('tape-catch'); +let ReadableStreamReader; + +test('Can get the ReadableStreamReader constructor indirectly', t => { + t.doesNotThrow(() => { + // It's not exposed globally, but we test a few of its properties here. + ReadableStreamReader = (new ReadableStream()).getReader().constructor; + }); + t.end(); +}); + function fakeReadableStream() { return { get closed() { return Promise.resolve(); }, - get state() { return 'closed' }, cancel(reason) { return Promise.resolve(); }, pipeThrough({ writable, readable }, options) { return readable; }, pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); }, - read() { return Promise.resolve(ReadableStream.EOS); }, - constructor: { - EOS: ReadableStream.EOS - } + getReader() { return new ReadableStream(new ReadableStream()); } }; } @@ -33,6 +39,16 @@ function realWritableStream() { return new WritableStream(); } +function fakeReadableStreamReader() { + return { + get closed() { return Promise.resolve(); }, + get isActive() { return false; }, + cancel(reason) { return Promise.resolve(); }, + read() { return Promise.resolve({ value: undefined, done: true }); }, + releaseLock() { return; } + }; +} + function fakeByteLengthQueuingStrategy() { return { shouldApplyBackpressure(queueSize) { @@ -99,18 +115,18 @@ test('ReadableStream.prototype.closed enforces a brand check', t => { getterRejects(t, ReadableStream.prototype, 'closed', realWritableStream()); }); -test('ReadableStream.prototype.state enforces a brand check', t => { - t.plan(2); - getterThrows(t, ReadableStream.prototype, 'state', fakeReadableStream()); - getterThrows(t, ReadableStream.prototype, 'state', realWritableStream()); -}); - test('ReadableStream.prototype.cancel enforces a brand check', t => { t.plan(2); methodRejects(t, ReadableStream.prototype, 'cancel', fakeReadableStream()); methodRejects(t, ReadableStream.prototype, 'cancel', realWritableStream()); }); +test('ReadableStream.prototype.getReader enforces a brand check', t => { + t.plan(2); + methodThrows(t, ReadableStream.prototype, 'getReader', fakeReadableStream()); + methodThrows(t, ReadableStream.prototype, 'getReader', realWritableStream()); +}); + test('ReadableStream.prototype.pipeThrough works generically on its this and its arguments', t => { t.plan(2); @@ -137,10 +153,43 @@ test('ReadableStream.prototype.pipeTo works generically on its this and its argu t.doesNotThrow(() => ReadableStream.prototype.pipeTo.call(fakeReadableStream(), fakeWritableStream())); }); -test('ReadableStream.prototype.read enforces a brand check', t => { + + + +test('ReadableStreamReader enforces a brand check on its argument', t => { + t.plan(1); + t.throws(() => new ReadableStreamReader(fakeReadableStream()), /TypeError/, 'Contructing a ReadableStreamReader ' + + 'should throw'); +}); + +test('ReadableStreamReader.prototype.closed enforces a brand check', t => { + t.plan(2); + getterRejects(t, ReadableStreamReader.prototype, 'closed', fakeReadableStreamReader()); + getterRejects(t, ReadableStreamReader.prototype, 'closed', realReadableStream()); +}); + +test('ReadableStreamReader.prototype.isActive enforces a brand check', t => { + t.plan(2); + getterThrows(t, ReadableStreamReader.prototype, 'isActive', fakeReadableStreamReader()); + getterThrows(t, ReadableStreamReader.prototype, 'isActive', realReadableStream()); +}); + +test('ReadableStreamReader.prototype.cancel enforces a brand check', t => { + t.plan(2); + methodRejects(t, ReadableStreamReader.prototype, 'cancel', fakeReadableStreamReader()); + methodRejects(t, ReadableStreamReader.prototype, 'cancel', realReadableStream()); +}); + +test('ReadableStreamReader.prototype.read enforces a brand check', t => { + t.plan(2); + methodRejects(t, ReadableStreamReader.prototype, 'read', fakeReadableStreamReader()); + methodRejects(t, ReadableStreamReader.prototype, 'read', realReadableStream()); +}); + +test('ReadableStreamReader.prototype.releaseLock enforces a brand check', t => { t.plan(2); - methodRejects(t, ReadableStream.prototype, 'read', fakeReadableStream()); - methodRejects(t, ReadableStream.prototype, 'read', realWritableStream()); + methodThrows(t, ReadableStreamReader.prototype, 'releaseLock', fakeReadableStreamReader()); + methodThrows(t, ReadableStreamReader.prototype, 'releaseLock', realReadableStream()); }); diff --git a/reference-implementation/test/count-queuing-strategy.js b/reference-implementation/test/count-queuing-strategy.js index a15c78757..f2bf98798 100644 --- a/reference-implementation/test/count-queuing-strategy.js +++ b/reference-implementation/test/count-queuing-strategy.js @@ -30,31 +30,37 @@ test('Correctly governs the return value of a ReadableStream\'s enqueue function start(enqueue_) { enqueue = enqueue_; }, strategy: new CountQueuingStrategy({ highWaterMark: 0 }) }); + const reader = rs.getReader(); t.equal(enqueue('a'), false, 'After 0 reads, 1st enqueue should return false (queue now contains 1 chunk)'); t.equal(enqueue('b'), false, 'After 0 reads, 2nd enqueue should return false (queue now contains 2 chunks)'); t.equal(enqueue('c'), false, 'After 0 reads, 3rd enqueue should return false (queue now contains 3 chunks)'); t.equal(enqueue('d'), false, 'After 0 reads, 4th enqueue should return false (queue now contains 4 chunks)'); - rs.read().then(chunk => { - t.equal(chunk, 'a', '1st read gives back the 1st chunk enqueued (queue now contains 3 chunks)'); - return rs.read(); + reader.read().then(result => { + t.deepEqual(result, { value: 'a', done: false }, + '1st read gives back the 1st chunk enqueued (queue now contains 3 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'b', '2nd read gives back the 2nd chunk enqueued (queue now contains 2 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'b', done: false }, + '2nd read gives back the 2nd chunk enqueued (queue now contains 2 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'c', '3rd read gives back the 2nd chunk enqueued (queue now contains 1 chunk)'); + .then(result => { + t.deepEqual(result, { value: 'c', done: false }, + '3rd read gives back the 3rd chunk enqueued (queue now contains 1 chunk)'); t.equal(enqueue('e'), false, 'After 3 reads, 5th enqueue should return false (queue now contains 2 chunks)'); - return rs.read(); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'd', '4th read gives back the 3rd chunk enqueued (queue now contains 1 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'd', done: false }, + '4th read gives back the 4th chunk enqueued (queue now contains 1 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'e', '5th read gives back the 4th chunk enqueued (queue now contains 0 chunks)'); + .then(result => { + t.deepEqual(result, { value: 'e', done: false }, + '5th read gives back the 5th chunk enqueued (queue now contains 0 chunks)'); t.equal(enqueue('f'), false, 'After 5 reads, 6th enqueue should return false (queue now contains 1 chunk)'); t.equal(enqueue('g'), false, 'After 5 reads, 7th enqueue should return false (queue now contains 2 chunks)'); t.end(); @@ -68,31 +74,37 @@ test('Correctly governs the return value of a ReadableStream\'s enqueue function start(enqueue_) { enqueue = enqueue_; }, strategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); + const reader = rs.getReader(); t.equal(enqueue('a'), true, 'After 0 reads, 1st enqueue should return true (queue now contains 1 chunk)'); t.equal(enqueue('b'), false, 'After 0 reads, 2nd enqueue should return false (queue now contains 2 chunks)'); t.equal(enqueue('c'), false, 'After 0 reads, 3rd enqueue should return false (queue now contains 3 chunks)'); t.equal(enqueue('d'), false, 'After 0 reads, 4th enqueue should return false (queue now contains 4 chunks)'); - rs.read().then(chunk => { - t.equal(chunk, 'a', '1st read gives back the 1st chunk enqueued (queue now contains 3 chunks)'); - return rs.read(); + reader.read().then(result => { + t.deepEqual(result, { value: 'a', done: false }, + '1st read gives back the 1st chunk enqueued (queue now contains 3 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'b', '2nd read gives back the 2nd chunk enqueued (queue now contains 2 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'b', done: false }, + '2nd read gives back the 2nd chunk enqueued (queue now contains 2 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'c', '3rd read gives back the 2nd chunk enqueued (queue now contains 1 chunk)'); + .then(result => { + t.deepEqual(result, { value: 'c', done: false }, + '3rd read gives back the 3rd chunk enqueued (queue now contains 1 chunk)'); t.equal(enqueue('e'), false, 'After 3 reads, 5th enqueue should return false (queue now contains 2 chunks)'); - return rs.read(); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'd', '4th read gives back the 3rd chunk enqueued (queue now contains 1 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'd', done: false }, + '4th read gives back the 4th chunk enqueued (queue now contains 1 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'e', '5th read gives back the 4th chunk enqueued (queue now contains 0 chunks)'); + .then(result => { + t.deepEqual(result, { value: 'e', done: false }, + '5th read gives back the 5th chunk enqueued (queue now contains 0 chunks)'); t.equal(enqueue('f'), true, 'After 5 reads, 6th enqueue should return true (queue now contains 1 chunk)'); t.equal(enqueue('g'), false, 'After 5 reads, 7th enqueue should return false (queue now contains 2 chunks)'); t.end(); @@ -106,6 +118,7 @@ test('Correctly governs the return value of a ReadableStream\'s enqueue function start(enqueue_) { enqueue = enqueue_; }, strategy: new CountQueuingStrategy({ highWaterMark: 4 }) }); + const reader = rs.getReader(); t.equal(enqueue('a'), true, 'After 0 reads, 1st enqueue should return true (queue now contains 1 chunk)'); t.equal(enqueue('b'), true, 'After 0 reads, 2nd enqueue should return true (queue now contains 2 chunks)'); @@ -114,29 +127,35 @@ test('Correctly governs the return value of a ReadableStream\'s enqueue function t.equal(enqueue('e'), false, 'After 0 reads, 5th enqueue should return false (queue now contains 5 chunks)'); t.equal(enqueue('f'), false, 'After 0 reads, 6th enqueue should return false (queue now contains 6 chunks)'); - rs.read().then(chunk => { - t.equal(chunk, 'a', '1st read gives back the 1st chunk enqueued (queue now contains 5 chunks)'); - return rs.read(); + reader.read().then(result => { + t.deepEqual(result, { value: 'a', done: false }, + '1st read gives back the 1st chunk enqueued (queue now contains 5 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'b', '2nd read gives back the 2nd chunk enqueued (queue now contains 4 chunks)'); + .then(result => { + t.deepEqual(result, { value: 'b', done: false }, + '2nd read gives back the 2nd chunk enqueued (queue now contains 4 chunks)'); t.equal(enqueue('g'), false, 'After 2 reads, 7th enqueue should return false (queue now contains 5 chunks)'); - return rs.read(); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'c', '3rd read gives back the 3rd chunk enqueued (queue now contains 4 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'c', done: false }, + '3rd read gives back the 3rd chunk enqueued (queue now contains 4 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'd', '4th read gives back the 4th chunk enqueued (queue now contains 3 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'd', done: false }, + '4th read gives back the 4th chunk enqueued (queue now contains 3 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'e', '5th read gives back the 5th chunk enqueued (queue now contains 2 chunks)'); - return rs.read(); + .then(result => { + t.deepEqual(result, { value: 'e', done: false }, + '5th read gives back the 5th chunk enqueued (queue now contains 2 chunks)'); + return reader.read(); }) - .then(chunk => { - t.equal(chunk, 'f', '6th read gives back the 6th chunk enqueued (queue now contains 1 chunk)'); + .then(result => { + t.deepEqual(result, { value: 'f', done: false }, + '6th read gives back the 6th chunk enqueued (queue now contains 1 chunk)'); t.equal(enqueue('h'), true, 'After 6 reads, 8th enqueue should return true (queue now contains 2 chunks)'); t.equal(enqueue('i'), true, 'After 6 reads, 9th enqueue should return true (queue now contains 3 chunks)'); t.equal(enqueue('j'), true, 'After 6 reads, 10th enqueue should return true (queue now contains 4 chunks)'); diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js index feb833b97..079052e86 100644 --- a/reference-implementation/test/pipe-through.js +++ b/reference-implementation/test/pipe-through.js @@ -13,7 +13,7 @@ test('Piping through a duck-typed pass-through transform stream works', t => { }); test('Piping through an identity transform stream will close the destination when the source closes', t => { - t.plan(2); + t.plan(1); const rs = new ReadableStream({ start(enqueue, close) { @@ -34,7 +34,6 @@ test('Piping through an identity transform stream will close the destination whe const ws = new WritableStream(); rs.pipeThrough(ts).pipeTo(ws).then(() => { - t.equal(rs.state, 'closed', 'the readable stream was closed'); t.equal(ws.state, 'closed', 'the writable stream was closed'); }) .catch(e => t.error(e)); diff --git a/reference-implementation/test/pipe-to-options.js b/reference-implementation/test/pipe-to-options.js index b0d511c57..cba03febf 100644 --- a/reference-implementation/test/pipe-to-options.js +++ b/reference-implementation/test/pipe-to-options.js @@ -10,13 +10,9 @@ test('Piping with no options and no errors', t => { } }); - rs.pipeTo(ws); - - rs.closed.then(() => { - setTimeout(() => { - t.equal(ws.state, 'closed', 'destination should be closed'); - t.end(); - }, 0); + rs.pipeTo(ws).then(() => { + t.equal(ws.state, 'closed', 'destination should be closed'); + t.end(); }); }); @@ -28,13 +24,9 @@ test('Piping with { preventClose: false } and no errors', t => { } }); - rs.pipeTo(ws, { preventClose: false }); - - rs.closed.then(() => { - setTimeout(() => { - t.equal(ws.state, 'closed', 'destination should be closed'); - t.end(); - }, 0); + rs.pipeTo(ws, { preventClose: false }).then(() => { + t.equal(ws.state, 'closed', 'destination should be closed'); + t.end(); }); }); @@ -50,23 +42,9 @@ test('Piping with { preventClose: true } and no errors', t => { } }); - const pipeToPromise = rs.pipeTo(ws, { preventClose: true }); - - rs.closed.then(() => { - setTimeout(() => { - t.equal(ws.state, 'writable', 'destination should be writable'); - - pipeToPromise.then( - v => { - t.equal(v, undefined); - t.end(); - }, - r => { - t.fail('pipeToPromise is rejected'); - t.end(); - } - ); - }, 0); + rs.pipeTo(ws, { preventClose: true }).then(() => { + t.equal(ws.state, 'writable', 'destination should be writable'); + t.end(); }); }); @@ -118,24 +96,11 @@ test('Piping with { preventAbort: true } and a source error', t => { } }); - const pipeToPromise = rs.pipeTo(ws, { preventAbort: true }); - - rs.closed.catch(() => { - setTimeout(() => { - t.equal(ws.state, 'writable', 'destination should remain writable'); - - pipeToPromise.then( - () => { - t.fail('pipeToPromise is fulfilled'); - t.end(); - }, - r => { - t.equal(r, theError, 'rejection reason of pipeToPromise is the source error'); - t.end(); - } - ); - }, 0); - }) + rs.pipeTo(ws, { preventAbort: true }).catch(e => { + t.equal(ws.state, 'writable', 'destination should remain writable'); + t.equal(e, theError, 'rejection reason of pipeToPromise is the source error'); + t.end(); + }); }); test('Piping with no options and a destination error', t => { @@ -197,10 +162,11 @@ test('Piping with { preventCancel: false } and a destination error', t => { test('Piping with { preventCancel: true } and a destination error', t => { const theError = new Error('destination error'); const rs = new ReadableStream({ - start(enqueue, close) { + start(enqueue) { enqueue('a'); setTimeout(() => enqueue('b'), 10); setTimeout(() => enqueue('c'), 20); + setTimeout(() => enqueue('d'), 30); }, cancel(r) { t.fail('unexpected call to cancel'); @@ -216,22 +182,18 @@ test('Piping with { preventCancel: true } and a destination error', t => { } }); - const pipeToPromise = rs.pipeTo(ws, { preventCancel: true }); + rs.pipeTo(ws, { preventCancel: true }).catch(e => { + t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error'); - ws.closed.catch(() => { - setTimeout(() => { - t.equal(rs.state, 'readable', 'source should remain readable'); + let reader; + t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes'); - pipeToPromise.then( - () => { - t.fail('pipeToPromise is fulfilled'); - t.end(); - }, - r => { - t.equal(r, theError, 'rejection reason of pipeToPromise is the sink error'); - t.end(); - } - ); - }, 30); - }); + // { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost. + + return reader.read().then(result => { + t.deepEqual(result, { value: 'd', done: false }, 'should be able to read the remaining chunk from the reader'); + t.end(); + }); + }) + .catch(e => t.error(e)); }); diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index f68eb8ecf..f7e098cfc 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -5,7 +5,7 @@ import sequentialReadableStream from './utils/sequential-rs'; // TODO: many asserts in this file are unlabeled; we should label them. test('Piping from a ReadableStream from which lots of data are readable synchronously', t => { - t.plan(5); + t.plan(4); const rs = new ReadableStream({ start(enqueue, close) { @@ -16,8 +16,6 @@ test('Piping from a ReadableStream from which lots of data are readable synchron } }); - t.equal(rs.state, 'readable', 'readable stream state should start out readable'); - const ws = new WritableStream({ strategy: new CountQueuingStrategy({ highWaterMark: 1000 @@ -26,11 +24,16 @@ test('Piping from a ReadableStream from which lots of data are readable synchron t.equal(ws.state, 'writable', 'writable stream state should start out writable'); + let rsClosed = false; + rs.closed.then(() => { + rsClosed = true; + }); + let pipeFinished = false; rs.pipeTo(ws).then( () => { pipeFinished = true; - t.equal(rs.state, 'closed', 'readable stream state should be closed after pipe finishes'); + t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); t.equal(ws.state, 'closed', 'writable stream state should be closed after pipe finishes'); }, e => t.error(e) @@ -42,7 +45,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron }); test('Piping from a ReadableStream in readable state to a WritableStream in closing state', t => { - t.plan(5); + t.plan(4); let cancelReason; const rs = new ReadableStream({ @@ -54,7 +57,6 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos cancelReason = reason; } }); - t.equal(rs.state, 'readable', 'readable stream should start in the readable state'); const ws = new WritableStream({ write() { @@ -68,12 +70,17 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos ws.close(); t.equal(ws.state, 'closing', 'writable stream should be closing immediately after closing it'); + let rsClosed = false; + rs.closed.then(() => { + rsClosed = true; + }); + rs.pipeTo(ws).then( () => t.fail('promise returned by pipeTo should not fulfill'), r => { t.equal(r, cancelReason, 'the pipeTo promise should reject with the same error as the underlying source cancel was called with'); - t.equal(rs.state, 'closed', 'the readable stream should be closed when the pipe finishes'); + t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); } ); }); @@ -96,10 +103,13 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro t.equal(reason, passedError); } }); - t.equal(rs.state, 'readable'); let writeCalled = false; + const startPromise = Promise.resolve(); const ws = new WritableStream({ + start() { + return startPromise; + }, write(chunk) { t.assert(!writeCalled, 'write must not be called more than once'); writeCalled = true; @@ -118,28 +128,24 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro } }); - // Wait for ws to start. - setTimeout(() => { + startPromise.then(() => { ws.write('Hello'); t.assert(writeCalled, 'write must be called'); ws.ready.then(() => { t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); - rs.pipeTo(ws); - - // Need to delay because pipeTo retrieves error from dest using ready. - setTimeout(() => { - t.assert(cancelCalled); - t.equal(rs.state, 'closed'); + rs.pipeTo(ws).catch(e => { + t.equal(e, passedError, 'pipeTo promise should be rejected with the error'); + t.assert(cancelCalled, 'cancel should have been called'); t.end(); - }, 0); + }); }); - }, 0); + }); }); test('Piping from a ReadableStream in the closed state to a WritableStream in the writable state', t => { - t.plan(4); + t.plan(3); const rs = new ReadableStream({ start(enqueue, close) { @@ -152,7 +158,6 @@ test('Piping from a ReadableStream in the closed state to a WritableStream in th t.fail('Unexpected cancel call'); } }); - t.equal(rs.state, 'closed'); const startPromise = Promise.resolve(); const ws = new WritableStream({ @@ -163,7 +168,7 @@ test('Piping from a ReadableStream in the closed state to a WritableStream in th t.fail('Unexpected write call'); }, close() { - t.pass('underlying sink close should be called'); + t.fail('Unexpected close call'); }, abort() { t.fail('Unexpected abort call'); @@ -171,14 +176,20 @@ test('Piping from a ReadableStream in the closed state to a WritableStream in th }); startPromise.then(() => { - t.equal(ws.state, 'writable'); + t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - rs.pipeTo(ws).then(v => t.equal(v, undefined, 'pipeTo promise should be fulfilled with undefined')); + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not fulfill'), + e => { + t.equal(e.constructor, TypeError, 'pipeTo promise should be rejected with a TypeError'); + t.equal(ws.state, 'writable', 'writable stream should still be writable'); + } + ); }); }); test('Piping from a ReadableStream in the errored state to a WritableStream in the writable state', t => { - t.plan(4); + t.plan(3); const theError = new Error('piping is too hard today'); const rs = new ReadableStream({ @@ -192,7 +203,6 @@ test('Piping from a ReadableStream in the errored state to a WritableStream in t t.fail('Unexpected cancel call'); } }); - t.equal(rs.state, 'errored'); const startPromise = Promise.resolve(); const ws = new WritableStream({ @@ -206,7 +216,7 @@ test('Piping from a ReadableStream in the errored state to a WritableStream in t t.fail('Unexpected close call'); }, abort() { - t.pass('underlying sink abort should be called'); + t.fail('Unexpected abort call'); } }); @@ -215,7 +225,10 @@ test('Piping from a ReadableStream in the errored state to a WritableStream in t rs.pipeTo(ws).then( () => t.fail('pipeTo promise should not be fulfilled'), - e => t.equal(e, theError, 'pipeTo promise should be rejected with the passed error') + e => { + t.equal(e, theError, 'pipeTo promise should be rejected with the passed error'); + t.equal(ws.state, 'writable', 'writable stream should still be writable'); + } ); }); }); @@ -238,7 +251,6 @@ test('Piping from a ReadableStream in the readable state which becomes closed af t.fail('Unexpected cancel call'); } }); - t.equal(rs.state, 'readable', 'readable stream should start in the readable state'); let writeCalled = false; const startPromise = Promise.resolve(); @@ -264,7 +276,10 @@ test('Piping from a ReadableStream in the readable state which becomes closed af }); startPromise.then(() => { - rs.pipeTo(ws); + rs.pipeTo(ws).then(() => { + t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes'); + }); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); closeReadableStream(); @@ -273,7 +288,7 @@ test('Piping from a ReadableStream in the readable state which becomes closed af test('Piping from a ReadableStream in the readable state which becomes errored after pipeTo call to a WritableStream ' + 'in the writable state', t => { - t.plan(4); + t.plan(5); let errorReadableStream; let pullCount = 0; @@ -289,7 +304,6 @@ test('Piping from a ReadableStream in the readable state which becomes errored a t.fail('Unexpected cancel call'); } }); - t.equal(rs.state, 'readable', 'readable stream should start in the readable state'); let passedError = new Error('horrible things'); const startPromise = Promise.resolve(); @@ -310,7 +324,11 @@ test('Piping from a ReadableStream in the readable state which becomes errored a }); startPromise.then(() => { - rs.pipeTo(ws); + rs.pipeTo(ws).catch(e => { + t.equal(e, passedError, 'pipeTo should be rejected with the passed error'); + t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes'); + }); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); errorReadableStream(passedError); @@ -319,6 +337,7 @@ test('Piping from a ReadableStream in the readable state which becomes errored a test('Piping from an empty ReadableStream which becomes non-empty after pipeTo call to a WritableStream in the ' + 'writable state', t => { + t.plan(3); let enqueue; let pullCount = 0; const rs = new ReadableStream({ @@ -330,36 +349,31 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c }, cancel() { t.fail('Unexpected cancel call'); - t.end(); } }); const ws = new WritableStream({ write(chunk) { - t.equal(chunk, 'Hello'); - t.equal(pullCount, 1); - t.end(); + t.equal(chunk, 'Hello', 'underlying sink write should be called with the single chunk'); + t.equal(pullCount, 1, 'pull should have been called once'); }, close() { t.fail('Unexpected close call'); - t.end(); }, abort(reason) { t.fail('Unexpected abort call'); - t.end(); } }); - rs.pipeTo(ws); - t.equal(rs.state, 'readable'); - t.equal(ws.state, 'writable'); + rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill')); + t.equal(ws.state, 'writable', 'writable stream should start in writable state'); enqueue('Hello'); }); test('Piping from an empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the ' + 'writable state', t => { - t.plan(4); + t.plan(3); let errorReadableStream; const rs = new ReadableStream({ @@ -368,11 +382,9 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal }, pull() { t.fail('Unexpected pull call'); - t.end(); }, cancel() { t.fail('Unexpected cancel call'); - t.end(); } }); @@ -380,23 +392,18 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal const ws = new WritableStream({ write() { t.fail('Unexpected write call'); - t.end(); }, close() { t.fail('Unexpected close call'); - t.end(); }, abort(reason) { t.equal(reason, passedError, 'underlying sink abort should receive the error from the readable stream'); } }); - rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'readable stream should start out readable'); + rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error')); t.equal(ws.state, 'writable', 'writable stream should start out writable'); - errorReadableStream(passedError); - t.equal(rs.state, 'errored', 'readable stream should become errored'); }); test('Piping from an empty ReadableStream to a WritableStream in the writable state which becomes errored after a ' + @@ -412,7 +419,7 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st }, cancel(reason) { t.equal(reason, theError, 'underlying source cancellation reason should be the writable stream error'); - t.equal(pullCount, 2, 'pull should have been called once by cancel-time'); + t.equal(pullCount, 1, 'pull should have been called once by cancel-time'); } }); @@ -437,8 +444,7 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st startPromise.then(() => { t.equal(ws.state, 'writable', 'ws should start writable'); - rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'rs should be readable after pipe'); + rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error')); t.equal(ws.state, 'writable', 'ws should be writable after pipe'); errorWritableStream(theError); @@ -461,7 +467,6 @@ test('Piping from a non-empty ReadableStream to a WritableStream in the waiting t.fail('Unexpected cancel call'); } }); - t.equal(rs.state, 'readable'); let resolveWritePromise; const startPromise = Promise.resolve(); @@ -492,7 +497,6 @@ test('Piping from a non-empty ReadableStream to a WritableStream in the waiting t.equal(ws.state, 'waiting'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'readable stream must say it is readable while piping'); t.equal(ws.state, 'waiting'); resolveWritePromise(); @@ -522,7 +526,6 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat t.end(); } }); - t.equal(rs.state, 'readable'); let errorWritableStream; const startPromise = Promise.resolve(); @@ -539,11 +542,9 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat }, close() { t.fail('Unexpected close call'); - t.end(); }, abort() { t.fail('Unexpected abort call'); - t.end(); } }); ws.write('Hello'); @@ -551,9 +552,7 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat startPromise.then(() => { t.equal(ws.state, 'waiting'); - t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'readable stream must say it is readable while piping'); t.equal(ws.state, 'waiting'); errorWritableStream(); @@ -563,7 +562,7 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat test('Piping from a non-empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the ' + 'waiting state', t => { - t.plan(10); + t.plan(6); let errorReadableStream; let pullCount = 0; @@ -580,7 +579,6 @@ test('Piping from a non-empty ReadableStream which becomes errored after pipeTo t.end(); } }); - t.equal(rs.state, 'readable'); let writeCalled = false; const startPromise = Promise.resolve(); @@ -609,13 +607,10 @@ test('Piping from a non-empty ReadableStream which becomes errored after pipeTo t.equal(ws.state, 'waiting'); t.equal(pullCount, 1); - t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'readable stream must say it is readable while piping'); t.equal(ws.state, 'waiting'); errorReadableStream(); - t.equal(rs.state, 'errored'); }); }); @@ -673,7 +668,7 @@ test('Piping from a non-empty ReadableStream to a WritableStream in the waiting // Check that nothing happens before calling done(), and then call done() // to check that pipeTo is woken up. setTimeout(() => { - t.equal(pullCount, 3); + t.equal(pullCount, 2); checkSecondWrite = true; @@ -719,13 +714,12 @@ test('Piping from an empty ReadableStream to a WritableStream in the waiting sta t.equal(ws.state, 'waiting'); rs.pipeTo(ws); - t.equal(rs.state, 'readable'); t.equal(ws.state, 'waiting'); t.equal(pullCount, 1); resolveWritePromise(); setTimeout(() => { - t.equal(pullCount, 2); + t.equal(pullCount, 1); t.end(); }, 100); @@ -734,7 +728,7 @@ test('Piping from an empty ReadableStream to a WritableStream in the waiting sta test('Piping from an empty ReadableStream which becomes closed after a pipeTo call to a WritableStream in the ' + 'waiting state whose writes never complete', t => { - t.plan(5); + t.plan(4); let closeReadableStream; let pullCount = 0; @@ -780,8 +774,6 @@ test('Piping from an empty ReadableStream which becomes closed after a pipeTo ca rs.pipeTo(ws); - t.equal(rs.state, 'closed', 'the readable stream should be closed after closing it'); - setTimeout(() => { t.equal(ws.state, 'waiting', 'the writable stream should still be waiting since the write never completed'); t.equal(pullCount, 1, 'pull should have been called only once'); @@ -791,7 +783,7 @@ test('Piping from an empty ReadableStream which becomes closed after a pipeTo ca test('Piping from an empty ReadableStream which becomes errored after a pipeTo call to a WritableStream in the ' + 'waiting state', t => { - t.plan(6); + t.plan(5); let errorReadableStream; let pullCount = 0; @@ -809,7 +801,11 @@ test('Piping from an empty ReadableStream which becomes errored after a pipeTo c let writeCalled = false; const passedError = new Error('horrible things'); + const startPromise = Promise.resolve(); const ws = new WritableStream({ + start() { + return startPromise; + }, write(chunk) { if (!writeCalled) { t.equal(chunk, 'Hello'); @@ -825,20 +821,17 @@ test('Piping from an empty ReadableStream which becomes errored after a pipeTo c abort(reason) { t.equal(reason, passedError); t.assert(writeCalled); - t.equal(pullCount, 2); + t.equal(pullCount, 1); } }); ws.write('Hello'); - // Wait for ws to start. - setTimeout(() => { + startPromise.then(() => { t.equal(ws.state, 'waiting'); rs.pipeTo(ws); errorReadableStream(passedError); - - t.equal(rs.state, 'errored'); }); }); @@ -884,12 +877,11 @@ test('Piping to a stream that has been aborted passes through the error as the c const passedReason = new Error('I don\'t like you.'); ws.abort(passedReason); - rs.pipeTo(ws); - - setTimeout(() => { + rs.pipeTo(ws).catch(e => { + t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason'); t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); t.end(); - }, 10); + }); }); test('Piping to a stream and then aborting it passes through the error as the cancellation reason', t => { @@ -903,13 +895,14 @@ test('Piping to a stream and then aborting it passes through the error as the ca const ws = new WritableStream(); const passedReason = new Error('I don\'t like you.'); - rs.pipeTo(ws); + const pipeToPromise = rs.pipeTo(ws); ws.abort(passedReason); - setTimeout(() => { + pipeToPromise.catch(e => { + t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason'); t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); t.end(); - }, 10); + }); }); test('Piping to a stream that has been closed propagates a TypeError cancellation reason backward', t => { @@ -923,12 +916,11 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio const ws = new WritableStream(); ws.close(); - rs.pipeTo(ws); - - setTimeout(() => { - t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason must be a TypeError'); + rs.pipeTo(ws).catch(e => { + t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); + t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); - }, 10); + }); }); test('Piping to a stream and then closing it propagates a TypeError cancellation reason backward', t => { @@ -941,13 +933,14 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation const ws = new WritableStream(); - rs.pipeTo(ws); + const pipeToPromise = rs.pipeTo(ws); ws.close(); - setTimeout(() => { - t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason must be a TypeError'); + pipeToPromise.catch(e => { + t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); + t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); - }, 10); + }); }); test('Piping to a stream that errors on write should pass through the error as the cancellation reason', t => { diff --git a/reference-implementation/test/readable-stream-cancel.js b/reference-implementation/test/readable-stream-cancel.js index 17801a6a8..6d192de56 100644 --- a/reference-implementation/test/readable-stream-cancel.js +++ b/reference-implementation/test/readable-stream-cancel.js @@ -32,7 +32,6 @@ test('ReadableStream cancellation: integration test on an infinite stream derive readableStreamToArray(rs).then( chunks => { - t.equal(rs.state, 'closed', 'stream should be closed'); t.equal(cancellationFinished, false, 'it did not wait for the cancellation process to finish before closing'); t.ok(chunks.length > 0, 'at least one chunk should be read'); for (let i = 0; i < chunks.length; i++) { @@ -46,69 +45,11 @@ test('ReadableStream cancellation: integration test on an infinite stream derive rs.cancel().then(() => { t.equal(cancellationFinished, true, 'it returns a promise that is fulfilled when the cancellation finishes'); t.end(); - }); + }) + .catch(e => t.error(e)); }, 150); }); -test('ReadableStream cancellation: cancelling immediately should put the stream in a closed state', t => { - const rs = sequentialReadableStream(5); - - t.plan(4); - - rs.closed.then( - () => t.pass('closed promise vended before the cancellation should fulfill'), - () => t.fail('closed promise vended before the cancellation should not reject') - ); - - rs.cancel(); - - t.equal(rs.state, 'closed', 'state should be closed immediately after cancel() call'); - - rs.closed.then( - () => t.pass('closed promise vended after the cancellation should fulfill'), - () => t.fail('closed promise vended after the cancellation should not be rejected') - ); - - rs.read().then( - chunk => t.equal(chunk, ReadableStream.EOS, 'read() promise vended after the cancellation should fulfill with EOS'), - () => t.fail('read() promise vended after the cancellation should not be rejected') - ); -}); - - -test('ReadableStream cancellation: cancelling after reading should put the stream in a closed state', t => { - const rs = sequentialReadableStream(5); - - t.plan(5); - - rs.closed.then( - () => t.pass('closed promise vended before the cancellation should fulfill'), - () => t.fail('closed promise vended before the cancellation should not reject') - ); - - rs.read().then( - chunk => { - t.equal(chunk, 1, 'read() promise vended before the cancellation should fulfill with the first chunk'); - - rs.cancel(); - - t.equal(rs.state, 'closed', 'state should be closed immediately after cancel() call'); - - rs.closed.then( - () => t.pass('closed promise vended after the cancellation should fulfill'), - () => t.fail('closed promise vended after the cancellation should not be rejected') - ); - - rs.read().then( - chunk => t.equal(chunk, ReadableStream.EOS, - 'read() promise vended after the cancellation should fulfill with EOS'), - () => t.fail('read() promise vended after the cancellation should not be rejected') - ); - }, - () => t.fail('read() promise vended after the cancellation should not be rejected') - ); -}); - test('ReadableStream cancellation: cancel(reason) should pass through the given reason to the underlying source', t => { let recordedReason; const rs = new ReadableStream({ @@ -125,43 +66,6 @@ test('ReadableStream cancellation: cancel(reason) should pass through the given t.end(); }); -test('ReadableStream cancellation: cancel() on a closed stream should return a promise resolved with undefined', t => { - t.plan(2); - - const rs = new ReadableStream({ - start(enqueue, close) { - close(); - } - }); - - t.equal(rs.state, 'closed', 'state should be closed already'); - - rs.cancel().then( - v => t.equal(v, undefined, 'cancel() return value should be fulfilled with undefined'), - () => t.fail('cancel() return value should not be rejected') - ); -}); - -test('ReadableStream cancellation: cancel() on an errored stream should return a promise rejected with the error', - t => { - t.plan(2); - - const passedError = new Error('aaaugh!!'); - - const rs = new ReadableStream({ - start(enqueue, close, error) { - error(passedError); - } - }); - - t.equal(rs.state, 'errored', 'state should be errored already'); - - rs.cancel().then( - () => t.fail('cancel() return value should not be fulfilled'), - r => t.equal(r, passedError, 'cancel() return value should be rejected with passedError') - ); -}); - test('ReadableStream cancellation: returning a value from the underlying source\'s cancel should not affect the ' + 'fulfillment value of the promise returned by the stream\'s cancel', t => { t.plan(1); diff --git a/reference-implementation/test/readable-stream-reader.js b/reference-implementation/test/readable-stream-reader.js new file mode 100644 index 000000000..c886d36fc --- /dev/null +++ b/reference-implementation/test/readable-stream-reader.js @@ -0,0 +1,208 @@ +const test = require('tape-catch'); + +let ReadableStreamReader; + +test('Can get the ReadableStreamReader constructor indirectly', t => { + t.doesNotThrow(() => { + // It's not exposed globally, but we test a few of its properties here. + ReadableStreamReader = (new ReadableStream()).getReader().constructor; + }); + t.end(); +}); + +test('Constructing an ReadableStreamReader directly should fail if the stream is already locked (via direct ' + + 'construction)', t => { + const rs = new ReadableStream(); + t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly the first time should be fine'); + t.throws(() => new ReadableStreamReader(rs), /TypeError/, 'constructing directly the second time should fail'); + t.end(); +}); + +test('Getting an ReadableStreamReader via getReader should fail if the stream is already locked (via direct ' + + 'construction', t => { + const rs = new ReadableStream(); + t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly should be fine'); + t.throws(() => rs.getReader(), /TypeError/, 'getReader() should fail'); + t.end(); +}); + +test('Constructing an ReadableStreamReader directly should fail if the stream is already locked (via getReader)', + t => { + const rs = new ReadableStream(); + t.doesNotThrow(() => rs.getReader(), 'getReader() should be fine'); + t.throws(() => new ReadableStreamReader(rs), /TypeError/, 'constructing directly should fail'); + t.end(); +}); + +test('Constructing an ReadableStreamReader directly should fail if the stream is already closed', + t => { + const rs = new ReadableStream({ + start(enqueue, close) { + close(); + } + }); + + t.throws(() => new ReadableStreamReader(rs), /TypeError/, 'constructing directly should fail'); + t.end(); +}); + +test('Constructing an ReadableStreamReader directly should fail if the stream is already errored', + t => { + const theError = new Error('don\'t say i didn\'t warn ya'); + const rs = new ReadableStream({ + start(enqueue, close, error) { + error(theError); + } + }); + + t.throws(() => new ReadableStreamReader(rs), /don't say i didn't warn ya/, 'getReader() threw the error'); + t.end(); +}); + +test('Reading from a reader for an empty stream will wait until a chunk is available', t => { + let enqueue; + const rs = new ReadableStream({ + start(e) { + enqueue = e; + } + }); + const reader = rs.getReader(); + + t.equal(reader.isActive, true, 'reader is active to start with'); + + reader.read().then(result => { + t.deepEqual(result, { value: 'a', done: false }, 'read() should fulfill with the enqueued chunk'); + t.equal(reader.isActive, true, 'reader is still active'); + t.end(); + }); + + enqueue('a'); +}); + +test('cancel() on a reader releases the reader before calling through', t => { + t.plan(3); + + const passedReason = new Error('it wasn\'t the right time, sorry'); + const rs = new ReadableStream({ + cancel(reason) { + t.equal(reader.isActive, false, 'reader should be released by the time underlying source cancel is called'); + t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source'); + } + }); + + const reader = rs.getReader(); + reader.cancel(passedReason).then( + () => t.pass('reader.cancel() should fulfill'), + e => t.fail('reader.cancel() should not reject') + ); +}); + +test('closed should be fulfilled after stream is closed (stream .closed access before acquiring)', t => { + t.plan(2); + + let doClose; + const rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + rs.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + }); + + const reader = rs.getReader(); + doClose(); + + reader.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + }); +}); + +test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => { + t.plan(6); + + let doClose; + const rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + const reader1 = rs.getReader(); + + rs.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled'); + }); + + reader1.releaseLock(); + + const reader2 = rs.getReader(); + doClose(); + + reader1.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled'); + }); + + reader2.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled'); + }); +}); + +test('Multiple readers can access the stream in sequence', t => { + const rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + close(); + } + }); + + const reader1 = rs.getReader(); + reader1.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'reading the first chunk from reader1 works')); + reader1.releaseLock(); + + const reader2 = rs.getReader(); + reader2.read().then(r => t.deepEqual(r, { value: 'b', done: false }, 'reading the second chunk from reader2 works')); + reader2.releaseLock(); + + t.end(); +}); + +test('Cannot use an already-released reader to unlock a stream again', t => { + t.plan(2); + + const rs = new ReadableStream(); + + const reader1 = rs.getReader(); + reader1.releaseLock(); + + const reader2 = rs.getReader(); + t.equal(reader2.isActive, true, 'reader2 state is active before releasing reader1'); + + reader1.releaseLock(); + t.equal(reader2.isActive, true, 'reader2 state is still active after releasing reader1 again'); +}); + +test('cancel() on a released reader is a no-op and does not pass through', t => { + const rs = new ReadableStream({ + start(enqueue) { + enqueue('a'); + }, + cancel() { + t.fail('underlying source cancel should not be called'); + } + }); + + const reader = rs.getReader(); + reader.releaseLock(); + reader.cancel().then(v => t.equal(v, undefined, 'cancel() on the reader should fulfill with undefined')); + + const reader2 = rs.getReader(); + reader2.read().then(r => t.deepEqual(r, { value: 'a', done: false }, 'a new reader should be able to read a chunk')); + + setTimeout(() => t.end(), 50); +}); diff --git a/reference-implementation/test/readable-stream-templated.js b/reference-implementation/test/readable-stream-templated.js new file mode 100644 index 000000000..4bed82072 --- /dev/null +++ b/reference-implementation/test/readable-stream-templated.js @@ -0,0 +1,117 @@ +import templatedRSEmpty from './templated/readable-stream-empty'; +import templatedRSClosed from './templated/readable-stream-closed'; +import templatedRSErrored from './templated/readable-stream-errored'; +import templatedRSErroredSyncOnly from './templated/readable-stream-errored-sync-only'; +import templatedRSEmptyReader from './templated/readable-stream-empty-reader'; +import templatedRSClosedReader from './templated/readable-stream-closed-reader'; +import templatedRSErroredReader from './templated/readable-stream-errored-reader'; +import templatedRSTwoChunksOpenReader from './templated/readable-stream-two-chunks-open-reader'; +import templatedRSTwoChunksClosedReader from './templated/readable-stream-two-chunks-closed-reader'; + +templatedRSEmpty('ReadableStream (empty)', + () => new ReadableStream() +); + +templatedRSEmptyReader('ReadableStream (empty) reader', + () => streamAndDefaultReader(new ReadableStream()) +); + +templatedRSClosed('ReadableStream (closed via call in start)', + () => new ReadableStream({ + start(enqueue, close) { close(); } + }) +); + +templatedRSClosedReader('ReadableStream (closed via call in start) reader', + () => { + let doClose; + const stream = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + const result = streamAndDefaultReader(stream); + doClose(); + return result; + } +); + +templatedRSClosed('ReadableStream (closed via cancel)', + () => { + const stream = new ReadableStream(); + stream.cancel(); + return stream; + } +); + +templatedRSClosedReader('ReadableStream (closed via cancel) reader', + () => { + const stream = new ReadableStream(); + const result = streamAndDefaultReader(stream); + stream.cancel(); + return result; + } +); + +const theError = new Error('boo!'); + +templatedRSErroredSyncOnly('ReadableStream (errored via call in start)', + () => new ReadableStream({ + start(enqueue, close, error) { error(theError); } + }), + theError +); + +templatedRSErrored('ReadableStream (errored via call in start)', + () => new ReadableStream({ + start(enqueue, close, error) { error(theError); } + }), + theError +); + +templatedRSErrored('ReadableStream (errored via returning a rejected promise in start)', + () => new ReadableStream({ + start(enqueue, close, error) { return Promise.reject(theError); } + }), + theError +); + +templatedRSErroredReader('ReadableStream (errored via returning a rejected promise in start) reader', + () => streamAndDefaultReader(new ReadableStream({ + start(enqueue, close, error) { return Promise.reject(theError); } + })), + theError +); + +const chunks = ['a', 'b']; + +templatedRSTwoChunksOpenReader('ReadableStream (two chunks enqueued, still open) reader', + () => streamAndDefaultReader(new ReadableStream({ + start(enqueue) { + enqueue(chunks[0]); + enqueue(chunks[1]); + } + })), + chunks +); + +templatedRSTwoChunksClosedReader('ReadableStream (two chunks enqueued, then closed) reader', + () => { + let doClose; + const stream = new ReadableStream({ + start(enqueue, close) { + enqueue(chunks[0]); + enqueue(chunks[1]); + doClose = close; + } + }); + const result = streamAndDefaultReader(stream); + doClose(); + return result; + }, + chunks +); + +function streamAndDefaultReader(stream) { + return { stream: stream, reader: stream.getReader() }; +} diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index 584e25062..d3e667683 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -9,222 +9,85 @@ test('ReadableStream can be constructed with no arguments', t => { t.end(); }); -// Traceur-troubles, skip for now -test.skip('ReadableStream has an EOS static property', t => { - const props = Object.getOwnPropertyNames(ReadableStream); - t.deepEqual(props, ['EOS']); - - const propDesc = Object.getOwnPropertyDescriptor(ReadableStream, 'EOS'); - t.equal(propDesc.enumerable, false); - t.equal(propDesc.writable, false); - t.equal(propDesc.configurable, false); - t.equal(typeof propDesc.value, 'symbol'); - t.equal(String(propDesc.value), 'ReadableStream.EOS'); - - t.end(); -}); - -test('ReadableStream instances have the correct methods and properties', t => { - const rs = new ReadableStream(); - - t.equal(typeof rs.read, 'function', 'has a read method'); - t.equal(typeof rs.cancel, 'function', 'has a cancel method'); - t.equal(typeof rs.pipeTo, 'function', 'has a pipeTo method'); - t.equal(typeof rs.pipeThrough, 'function', 'has a pipeThrough method'); - - t.equal(rs.state, 'readable', 'state starts out readable'); +test('ReadableStream: if start throws an error, it should be re-thrown', t => { + t.plan(1); - t.ok(rs.closed, 'has a closed property'); - t.ok(rs.closed.then, 'closed property is thenable'); + const error = new Error('aaaugh!!'); - t.end(); + t.throwsExactly(() => new ReadableStream({ start() { throw error; } }), error, 'error should be re-thrown'); }); -test('ReadableStream: immediately closing should put the stream in a closed state and fulfill closed with undefined', - t => { +test('ReadableStream: if pull rejects, it should error the stream', t => { t.plan(2); + const error = new Error('pull failure'); const rs = new ReadableStream({ - start(enqueue, close) { - close(); + pull() { + return Promise.reject(error); } }); - t.equal(rs.state, 'closed', 'The stream should be in closed state'); - - rs.closed.then( - v => t.equal(v, undefined, 'closed should fulfill with undefined'), - () => t.fail('closed should not reject') - ); -}); - -test('ReadableStream: leaving a stream empty leaves it in a readable state, causing read() to never settle', t => { - const rs = new ReadableStream(); - t.equal(rs.state, 'readable'); - - rs.read().then( - () => t.fail('read() should not fulfill'), - () => t.fail('read() should not reject') - ); + rs.closed.catch(e => { + t.equal(e, error, 'closed should reject with the thrown error'); + }); - setTimeout(() => t.end(), 100); + rs.getReader().read().catch(e => { + t.equal(e, error, 'read() should reject with the thrown error'); + }); }); -test('ReadableStream: reading a closed stream fulfills with EOS', t => { - t.plan(1); +test('ReadableStream: calling close twice should be a no-op', t => { + t.plan(2); - const rs = new ReadableStream({ + new ReadableStream({ start(enqueue, close) { close(); + t.doesNotThrow(close); } - }); - - rs.read().then( - v => t.equal(v, ReadableStream.EOS, 'read() should return a promise fulfilled with EOS'), - () => t.fail('read() should not return a rejected promise') - ); + }) + .closed.then(() => t.pass('closed should fulfill')); }); -test('ReadableStream: reading an errored stream rejects with the stored error', t => { +test('ReadableStream: calling error twice should be a no-op', t => { t.plan(2); - const passedError = new Error('aaaugh!!'); - const rs = new ReadableStream({ + const theError = new Error('boo!'); + const error2 = new Error('not me!'); + new ReadableStream({ start(enqueue, close, error) { - error(passedError); + error(theError); + t.doesNotThrow(() => error(error2)); } - }); - - t.equal(rs.state, 'errored'); - - rs.read().then( - () => t.fail('read() should not fulfill'), - e => t.equal(e, passedError, 'read() should reject with the passed error') - ); -}); - -test('ReadableStream: reading a forever-empty stream while a read is still ongoing rejects', t => { - t.plan(1); - - const rs = new ReadableStream(); - - rs.read().then( - () => t.fail('first read() should not fulfill'), - e => t.fail('first read() should not reject') - ); - - rs.read().then( - () => t.fail('second read() should not fulfill'), - e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError') - ); -}); - -test('ReadableStream: reading a nonempty stream while a read is still ongoing rejects', t => { - t.plan(2); - - const rs = new ReadableStream({ - start(enqueue) { - enqueue('a'); - enqueue('b'); - } - }); - - rs.read().then( - v => t.equal(v, 'a', 'first read() should fulfill with the first chunk'), - e => t.fail('first read() should not reject') - ); - - rs.read().then( - () => t.fail('second read() should not fulfill'), - e => t.equal(e.constructor, TypeError, 'second read() should reject with a TypeError') - ); + }) + .closed.catch(e => t.equal(e, theError, 'closed should reject with the first error')); }); -test('ReadableStream: reading a nonempty stream with appropriate waiting works fine', t => { +test('ReadableStream: calling error after close should be a no-op', t => { t.plan(2); - const rs = new ReadableStream({ - start(enqueue) { - enqueue('a'); - enqueue('b'); - } - }); - - rs.read() - .then( - v => { - t.equal(v, 'a', 'first read() should fulfill with the first chunk'); - return rs.read(); - }, - e => t.fail('first read() should not reject') - ) - .then( - v => t.equal(v, 'b', 'second read() should fulfill with the second chunk'), - e => t.fail('second read() should not reject') - ); -}); - -test('ReadableStream: reading a nonempty stream to the end works fine', t => { - t.plan(3); - - const rs = new ReadableStream({ - start(enqueue, close) { - enqueue('a'); - enqueue('b'); + new ReadableStream({ + start(enqueue, close, error) { close(); + t.doesNotThrow(error); } - }); - - rs.read() - .then( - v => { - t.equal(v, 'a', 'first read() should fulfill with the first chunk'); - return rs.read(); - }, - e => t.fail('first read() should not reject') - ) - .then( - v => { - t.equal(v, 'b', 'second read() should fulfill with the second chunk'); - return rs.read(); - }, - e => t.fail('second read() should not reject') || t.error(e) - ) - .then( - v => t.equal(v, ReadableStream.EOS, 'third read() should fulfill with EOS'), - e => t.fail('third read() should not reject') - ); + }) + .closed.then(() => t.pass('closed should fulfill')); }); -test('ReadableStream: draining a stream via read() causes the closed promise to fulfill', t => { - t.plan(5); +test('ReadableStream: calling close after error should be a no-op', t => { + t.plan(2); - const rs = new ReadableStream({ - start(enqueue, close) { - enqueue('test'); - close(); + const theError = new Error('boo!'); + new ReadableStream({ + start(enqueue, close, error) { + error(theError); + t.doesNotThrow(close); } - }); - - t.equal(rs.state, 'readable', 'The stream should be in readable state to start with'); - - rs.read().then( - v => { - t.equal(v, 'test', 'the enqueued chunk should be read'); - t.equal(rs.state, 'closed', 'the stream should still be in a closed state'); - }, - e => t.fail('read() should not reject') - ); - - t.equal(rs.state, 'closed', 'The stream should be in a closed state immediately after reading'); - - rs.closed.then( - v => t.equal(v, undefined, 'closed should fulfill with undefined'), - () => t.fail('closed should not reject') - ); + }) + .closed.catch(e => t.equal(e, theError, 'closed should reject with the first error')); }); -test('ReadableStream: should only call underlying source pull() once upon starting the stream', t => { +test('ReadableStream: should only call pull once upon starting the stream', t => { t.plan(2); let pullCount = 0; @@ -245,8 +108,7 @@ test('ReadableStream: should only call underlying source pull() once upon starti setTimeout(() => t.equal(pullCount, 1, 'pull should be called exactly once'), 50); }); -test('ReadableStream: should only call underlying source pull() once on a forever-empty stream, even after reading', - t => { +test('ReadableStream: should only call pull once for a forever-empty stream, even after reading', t => { t.plan(2); let pullCount = 0; @@ -264,13 +126,12 @@ test('ReadableStream: should only call underlying source pull() once on a foreve t.equal(pullCount, 1, 'pull should be called once start finishes'); }); - rs.read(); + rs.getReader().read(); setTimeout(() => t.equal(pullCount, 1, 'pull should be called exactly once'), 50); }); -test('ReadableStream: should only call underlying source pull() once on a non-empty stream read from before start ' + - 'fulfills', t => { +test('ReadableStream: should only call pull once on a non-empty stream read from before start fulfills', t => { t.plan(5); let pullCount = 0; @@ -289,8 +150,8 @@ test('ReadableStream: should only call underlying source pull() once on a non-em t.equal(pullCount, 1, 'pull should be called once start finishes'); }); - rs.read().then(v => { - t.equal(v, 'a', 'first read() should return first chunk'); + rs.getReader().read().then(r => { + t.deepEqual(r, { value: 'a', done: false }, 'first read() should return first chunk'); t.equal(pullCount, 1, 'pull should not have been called again'); }); @@ -299,8 +160,7 @@ test('ReadableStream: should only call underlying source pull() once on a non-em setTimeout(() => t.equal(pullCount, 1, 'pull should be called exactly once'), 50); }); -test('ReadableStream: should only call underlying source pull() twice on a non-empty stream read from after start ' + - 'fulfills', t => { +test('ReadableStream: should only call pull twice on a non-empty stream read from after start fulfills', t => { t.plan(5); let pullCount = 0; @@ -318,8 +178,8 @@ test('ReadableStream: should only call underlying source pull() twice on a non-e startPromise.then(() => { t.equal(pullCount, 1, 'pull should be called once start finishes'); - rs.read().then(v => { - t.equal(v, 'a', 'first read() should return first chunk'); + rs.getReader().read().then(r => { + t.deepEqual(r, { value: 'a', done: false }, 'first read() should return first chunk'); t.equal(pullCount, 2, 'pull should be called again once read fulfills'); }); }); @@ -329,199 +189,128 @@ test('ReadableStream: should only call underlying source pull() twice on a non-e setTimeout(() => t.equal(pullCount, 2, 'pull should be called exactly twice'), 50); }); -test('ReadableStream: should call underlying source pull() in reaction to read()ing the last chunk', t => { - t.plan(6); +test('ReadableStream: should call pull in reaction to read()ing the last chunk, if not draining', t => { + t.plan(4); let pullCount = 0; + let doEnqueue; const startPromise = Promise.resolve(); + const pullPromise = Promise.resolve(); const rs = new ReadableStream({ - start() { + start(enqueue) { + doEnqueue = enqueue; return startPromise; }, - pull(enqueue) { - enqueue(++pullCount); - } - }); - - startPromise.then(() => { - t.equal(pullCount, 1, 'pull should be called once start finishes'); - - return rs.read(); - }) - .then(v => { - t.equal(v, 1, 'first read() should return first chunk'); - t.equal(pullCount, 2, 'pull should be called in reaction to reading'); - return rs.read(); - }) - .then(v => { - t.equal(v, 2, 'second read() should return second chunk'); - t.equal(pullCount, 3, 'pull should be called in reaction to reading, again'); - }); - - setTimeout(() => t.equal(pullCount, 3, 'pull should be called exactly thrice'), 50); -}); - -test('ReadableStream: if start throws an error, it should be re-thrown', t => { - t.plan(1); - - const error = new Error('aaaugh!!'); - - try { - new ReadableStream({ start() { throw error; } }); - t.fail('Constructor didn\'t throw'); - } catch (caughtError) { - t.equal(caughtError, error, 'error was allowed to propagate'); - } -}); - -test('ReadableStream: if pull throws an error, it should error the stream', t => { - t.plan(5); - - const error = new Error('aaaugh!!'); - const rs = new ReadableStream({ pull() { - throw error; + ++pullCount; + return pullPromise; } }); - t.equal(rs.state, 'readable', 'state should start out "readable" since pull isn\'t called immediately'); + const reader = rs.getReader(); - rs.closed.catch(e => { - t.equal(rs.state, 'errored', 'state should be "errored" in closed catch'); - t.equal(e, error, 'closed should reject with the thrown error'); - }); + startPromise.then(() => { + t.equal(pullCount, 1, 'pull should have been called once after read'); - rs.read().catch(e => { - t.equal(rs.state, 'errored', 'state should be "errored" in read() catch'); - t.equal(e, error, 'read() should reject with the thrown error'); - }); -}); + doEnqueue('a'); -test('ReadableStream: if pull rejects, it should error the stream', t => { - t.plan(5); + return pullPromise.then(() => { + t.equal(pullCount, 2, 'pull should have been called a second time after enqueue'); - const error = new Error('pull failure'); - const rs = new ReadableStream({ - pull() { - return Promise.reject(error); - } - }); - - t.equal(rs.state, 'readable', 'state should start out "readable" since pull isn\'t called immediately'); - - rs.closed.catch(e => { - t.equal(rs.state, 'errored', 'state should be "errored" in closed catch'); - t.equal(e, error, 'closed should reject with the thrown error'); - }); + return reader.read().then(() => { + t.equal(pullCount, 3, 'pull should have been called a third time after read'); + }); + }); + }) + .catch(e => t.error(e)); - rs.read().catch(e => { - t.equal(rs.state, 'errored', 'state should be "errored" in read() catch'); - t.equal(e, error, 'read() should reject with the thrown error'); - }); + setTimeout(() => t.equal(pullCount, 3, 'pull should be called exactly thrice'), 50); }); -test('ReadableStream integration test: adapting a random push source', t => { - let pullChecked = false; - const randomSource = new RandomPushSource(8); +test('ReadableStream: should not call pull() in reaction to read()ing the last chunk, if draining', t => { + t.plan(4); + let pullCount = 0; + let doEnqueue; + let doClose; + const startPromise = Promise.resolve(); + const pullPromise = Promise.resolve(); const rs = new ReadableStream({ - start(enqueue, close, error) { - t.equal(typeof enqueue, 'function', 'enqueue should be a function in start'); - t.equal(typeof close, 'function', 'close should be a function in start'); - t.equal(typeof error, 'function', 'error should be a function in start'); - - randomSource.ondata = chunk => { - if (!enqueue(chunk)) { - randomSource.readStop(); - } - }; - - randomSource.onend = close; - randomSource.onerror = error; + start(enqueue, close) { + doEnqueue = enqueue; + doClose = close; + return startPromise; }, - - pull(enqueue, close) { - if (!pullChecked) { - pullChecked = true; - t.equal(typeof enqueue, 'function', 'enqueue should be a function in pull'); - t.equal(typeof close, 'function', 'close should be a function in pull'); - } - - randomSource.readStart(); + pull() { + ++pullCount; + return pullPromise; } }); - readableStreamToArray(rs).then( - chunks => { - t.equal(rs.state, 'closed', 'stream should be closed after all chunks are read'); - t.equal(chunks.length, 8, '8 chunks should be read'); - for (let i = 0; i < chunks.length; i++) { - t.equal(chunks[i].length, 128, `chunk ${i + 1} should have 128 bytes`); - } + const reader = rs.getReader(); - t.end(); - }, - e => t.error(e) - ); -}); + startPromise.then(() => { + t.equal(pullCount, 1, 'pull should have been called once after read'); -test('ReadableStream integration test: adapting a sync pull source', t => { - const rs = sequentialReadableStream(10); + doEnqueue('a'); - readableStreamToArray(rs).then(chunks => { - t.equal(rs.state, 'closed', 'stream should be closed after all chunks are read'); - t.equal(rs.source.closed, true, 'source should be closed after all chunks are read'); - t.deepEqual(chunks, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'the expected 10 chunks should be read'); - - t.end(); - }); -}); + return pullPromise.then(() => { + t.equal(pullCount, 2, 'pull should have been called a second time after enqueue'); -test('ReadableStream integration test: adapting an async pull source', t => { - const rs = sequentialReadableStream(10, { async: true }); + doClose(); - readableStreamToArray(rs).then(chunks => { - t.equal(rs.state, 'closed', 'stream should be closed after all chunks are read'); - t.equal(rs.source.closed, true, 'source should be closed after all chunks are read'); - t.deepEqual(chunks, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'the expected 10 chunks should be read'); + return reader.read().then(() => { + t.equal(pullCount, 2, 'pull should not have been called a third time after read'); + }); + }); + }) + .catch(e => t.error(e)); - t.end(); - }); + setTimeout(() => t.equal(pullCount, 2, 'pull should be called exactly twice'), 50); }); test('ReadableStream: should not call pull until the previous pull call\'s promise fulfills', t => { let resolve; let returnedPromise; let timesCalled = 0; + const startPromise = Promise.resolve(); const rs = new ReadableStream({ + start(enqueue) { + enqueue('a'); + return startPromise; + }, pull(enqueue) { ++timesCalled; - enqueue(timesCalled); returnedPromise = new Promise(r => { resolve = r; }); return returnedPromise; } }); + const reader = rs.getReader(); - rs.read().then(chunk1 => { - t.equal(timesCalled, 1, 'pull should not yet have been called a second time'); - t.equal(chunk1, 1, 'read() should fulfill with the enqueued value'); + startPromise.then(() => + reader.read().then(result1 => { + t.equal(timesCalled, 1, + 'pull should have been called once after start, but not yet have been called a second time'); + t.deepEqual(result1, { value: 'a', done: false }, 'read() should fulfill with the enqueued value'); - setTimeout(() => { - t.equal(timesCalled, 1, 'after 30 ms, pull should still only have been called once'); + setTimeout(() => { + t.equal(timesCalled, 1, 'after 30 ms, pull should still only have been called once'); - resolve(); + resolve(); - returnedPromise.then(() => { - t.equal(timesCalled, 2, 'after the promise returned by pull is fulfilled, pull should be called a second time'); - t.end(); - }); - }, 30); - }); + returnedPromise.then(() => { + t.equal(timesCalled, 2, + 'after the promise returned by pull is fulfilled, pull should be called a second time'); + t.end(); + }); + }, 30); + }) + ) + .catch(e => t.error(e)); }); test('ReadableStream: should pull after start, and after every read', t => { let timesCalled = 0; - const startPromise = Promise.resolve(); const rs = new ReadableStream({ start(enqueue) { @@ -542,17 +331,17 @@ test('ReadableStream: should pull after start, and after every read', t => { } } }); + const reader = rs.getReader(); - // Wait for start to finish startPromise.then(() => { - return rs.read().then(chunk1 => { - t.equal(chunk1, 'a', 'first chunk should be as expected'); + return reader.read().then(result1 => { + t.deepEqual(result1, { value: 'a', done: false }, 'first chunk should be as expected'); - return rs.read().then(chunk2 => { - t.equal(chunk2, 'b', 'second chunk should be as expected'); + return reader.read().then(result2 => { + t.deepEqual(result2, { value: 'b', done: false }, 'second chunk should be as expected'); - return rs.read().then(chunk3 => { - t.equal(chunk3, 'c', 'third chunk should be as expected'); + return reader.read().then(result3 => { + t.deepEqual(result3, { value: 'c', done: false }, 'third chunk should be as expected'); setTimeout(() => { // Once for after start, and once for every read. @@ -566,43 +355,29 @@ test('ReadableStream: should pull after start, and after every read', t => { .catch(e => t.error(e)); }); -test('ReadableStream strategies: the default strategy should return false for all but the first enqueue call', t => { - t.plan(5); - - new ReadableStream({ - start(enqueue) { - t.equal(enqueue('a'), true, 'first enqueue should return true'); - t.equal(enqueue('b'), false, 'second enqueue should return false'); - t.equal(enqueue('c'), false, 'third enqueue should return false'); - t.equal(enqueue('d'), false, 'fourth enqueue should return false'); - t.equal(enqueue('e'), false, 'fifth enqueue should return false'); - } - }); -}); - -test('ReadableStream strategies: the default strategy should continue returning true from enqueue if the chunks are ' + - 'read immediately', t => { - let doEnqueue; +test('ReadableStream: should not call pull after start if the stream is now closed', t => { + let timesCalled = 0; + const startPromise = Promise.resolve(); const rs = new ReadableStream({ - start(enqueue) { - doEnqueue = enqueue; + start(enqueue, close) { + enqueue('a'); + close(); + return startPromise; + }, + pull() { + ++timesCalled; } }); - t.equal(doEnqueue('a'), true, 'first enqueue should return true'); - - rs.read().then(chunk1 => { - t.equal(chunk1, 'a', 'first chunk read should be correct'); - t.equal(doEnqueue('b'), true, 'second enqueue should return true'); - - return rs.read().then(chunk2 => { - t.equal(chunk2, 'b', 'second chunk read should be correct'); - t.equal(doEnqueue('c'), true, 'third enqueue should return true'); + startPromise.then(() => { + t.equal(timesCalled, 0, 'after start finishes, pull should not have been called'); - return rs.read().then(chunk3 => { - t.equal(chunk3, 'c', 'third chunk read should be correct'); - t.equal(doEnqueue('d'), true, 'fourth enqueue should return true'); + const reader = rs.getReader(); + return reader.read().then(() => { + t.equal(timesCalled, 0, 'reading should not have triggered a pull call'); + return rs.closed.then(() => { + t.equal(timesCalled, 0, 'stream should have closed with still no calls to pull'); t.end(); }); }); @@ -610,8 +385,40 @@ test('ReadableStream strategies: the default strategy should continue returning .catch(e => t.error(e)); }); +test('ReadableStream: should call pull after enqueueing from inside pull (with no read requests), if strategy allows', + t => { + let timesCalled = 0; + const startPromise = Promise.resolve(); + const rs = new ReadableStream({ + start() { + return startPromise; + }, + pull(enqueue) { + enqueue(++timesCalled); + }, + strategy: { + size() { + return 1; + }, + shouldApplyBackpressure(size) { + return size > 3; + } + } + }); + + startPromise.then(() => { + // after start: size = 0, pull() + // after enqueue(1): size = 1, pull() + // after enqueue(2): size = 2, pull() + // after enqueue(3): size = 3, pull() + // after enqueue(4): size = 4, do not pull + t.equal(timesCalled, 4, 'pull() should have been called four times'); + t.end(); + }); +}); + test('ReadableStream: enqueue should throw when the stream is readable but draining', t => { - t.plan(4); + t.plan(2); const rs = new ReadableStream({ start(enqueue, close) { @@ -625,14 +432,10 @@ test('ReadableStream: enqueue should throw when the stream is readable but drain ); } }); - - t.equal(rs.state, 'readable', 'state should start readable'); - rs.read(); - t.equal(rs.state, 'closed', 'state should become closed immediately after reading'); }); test('ReadableStream: enqueue should throw when the stream is closed', t => { - t.plan(2); + t.plan(1); const rs = new ReadableStream({ start(enqueue, close) { @@ -645,12 +448,10 @@ test('ReadableStream: enqueue should throw when the stream is closed', t => { ); } }); - - t.equal(rs.state, 'closed', 'state should be closed immediately after creation'); }); test('ReadableStream: enqueue should throw the stored error when the stream is errored', t => { - t.plan(2); + t.plan(1); const expectedError = new Error('i am sad'); const rs = new ReadableStream({ @@ -664,48 +465,8 @@ test('ReadableStream: enqueue should throw the stored error when the stream is e ); } }); - - t.equal(rs.state, 'errored', 'state should be errored immediately after creation'); }); -test('ReadableStream: cancel() and closed on a closed stream should return the same promise', t => { - const rs = new ReadableStream({ - start(enqueue, close) { - close(); - } - }); - - t.equal(rs.cancel(), rs.closed, 'the promises returned should be the same'); - t.end(); -}); - -test('ReadableStream: cancel() and closed on an errored stream should return the same promise', t => { - const rs = new ReadableStream({ - start(enqueue, close, error) { - error(new Error('boo!')); - } - }); - - t.equal(rs.cancel(), rs.closed, 'the promises returned should be the same'); - t.end(); -}); - -test('ReadableStream: read() returns fresh promises each call (empty stream)', t => { - const rs = new ReadableStream(); - t.notEqual(rs.read(), rs.read(), 'the promises returned should be different'); - t.end(); -}); - -test('ReadableStream: read() returns fresh promises each call (stream with a chunk)', t => { - const rs = new ReadableStream({ - start(enqueue) { - enqueue('a'); - } - }); - - t.notEqual(rs.read(), rs.read(), 'the promises returned should be different'); - t.end(); -}); test('ReadableStream: should call underlying source methods as methods', t => { t.plan(6); @@ -735,5 +496,118 @@ test('ReadableStream: should call underlying source methods as methods', t => { theSource.debugName = 'the source object passed to the constructor'; // makes test failures easier to diagnose const rs = new ReadableStream(theSource); - rs.read().then(() => rs.cancel()); + rs.getReader().read().then(() => rs.cancel()); +}); + +test('ReadableStream strategies: the default strategy should return false for all but the first enqueue call', t => { + t.plan(5); + + new ReadableStream({ + start(enqueue) { + t.equal(enqueue('a'), true, 'first enqueue should return true'); + t.equal(enqueue('b'), false, 'second enqueue should return false'); + t.equal(enqueue('c'), false, 'third enqueue should return false'); + t.equal(enqueue('d'), false, 'fourth enqueue should return false'); + t.equal(enqueue('e'), false, 'fifth enqueue should return false'); + } + }); +}); + +test('ReadableStream strategies: the default strategy should continue returning true from enqueue if the chunks are ' + + 'read immediately', t => { + let doEnqueue; + const rs = new ReadableStream({ + start(enqueue) { + doEnqueue = enqueue; + } + }); + const reader = rs.getReader(); + + t.equal(doEnqueue('a'), true, 'first enqueue should return true'); + + reader.read().then(result1 => { + t.deepEqual(result1, { value: 'a', done: false }, 'first chunk read should be correct'); + t.equal(doEnqueue('b'), true, 'second enqueue should return true'); + + return reader.read(); + }) + .then(result2 => { + t.deepEqual(result2, { value: 'b', done: false }, 'second chunk read should be correct'); + t.equal(doEnqueue('c'), true, 'third enqueue should return true'); + + return reader.read(); + }) + .then(result3 => { + t.deepEqual(result3, { value: 'c', done: false }, 'third chunk read should be correct'); + t.equal(doEnqueue('d'), true, 'fourth enqueue should return true'); + + t.end(); + }) + .catch(e => t.error(e)); +}); + +test('ReadableStream integration test: adapting a random push source', t => { + let pullChecked = false; + const randomSource = new RandomPushSource(8); + + const rs = new ReadableStream({ + start(enqueue, close, error) { + t.equal(typeof enqueue, 'function', 'enqueue should be a function in start'); + t.equal(typeof close, 'function', 'close should be a function in start'); + t.equal(typeof error, 'function', 'error should be a function in start'); + + randomSource.ondata = chunk => { + if (!enqueue(chunk)) { + randomSource.readStop(); + } + }; + + randomSource.onend = close; + randomSource.onerror = error; + }, + + pull(enqueue, close) { + if (!pullChecked) { + pullChecked = true; + t.equal(typeof enqueue, 'function', 'enqueue should be a function in pull'); + t.equal(typeof close, 'function', 'close should be a function in pull'); + } + + randomSource.readStart(); + } + }); + + readableStreamToArray(rs).then( + chunks => { + t.equal(chunks.length, 8, '8 chunks should be read'); + for (let i = 0; i < chunks.length; i++) { + t.equal(chunks[i].length, 128, `chunk ${i + 1} should have 128 bytes`); + } + + t.end(); + }, + e => t.error(e) + ); +}); + +test('ReadableStream integration test: adapting a sync pull source', t => { + const rs = sequentialReadableStream(10); + + readableStreamToArray(rs).then(chunks => { + t.equal(rs.source.closed, true, 'source should be closed after all chunks are read'); + t.deepEqual(chunks, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'the expected 10 chunks should be read'); + + t.end(); + }); +}); + +test('ReadableStream integration test: adapting an async pull source', t => { + const rs = sequentialReadableStream(10, { async: true }); + + readableStreamToArray(rs).then(chunks => { + t.equal(rs.source.closed, true, 'source should be closed after all chunks are read'); + t.deepEqual(chunks, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'the expected 10 chunks should be read'); + + t.end(); + }); }); diff --git a/reference-implementation/test/templated/readable-stream-closed-reader.js b/reference-implementation/test/templated/readable-stream-closed-reader.js new file mode 100644 index 000000000..f7b32dbd1 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-closed-reader.js @@ -0,0 +1,50 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('read() should fulfill with { value: undefined, done: true }', t => { + t.plan(1); + const { reader } = factory(); + + reader.read().then( + v => t.deepEqual(v, { value: undefined, done: true }, 'read() should fulfill correctly'), + () => t.fail('read() should not return a rejected promise') + ); + }); + + test('closed should fulfill with undefined', t => { + t.plan(2); + const { stream, reader } = factory(); + + stream.closed.then( + v => t.equal(v, undefined, 'stream closed should fulfill with undefined'), + () => t.fail('stream closed should not reject') + ); + + reader.closed.then( + v => t.equal(v, undefined, 'reader closed should fulfill with undefined'), + () => t.fail('reader closed should not reject') + ); + }); + + test('cancel() should return a distinct fulfilled promise each time', t => { + t.plan(7); + const { stream, reader } = factory(); + + const cancelPromise1 = reader.cancel(); + const cancelPromise2 = reader.cancel(); + const closedStreamPromise = stream.closed; + const closedReaderPromise = reader.closed; + + cancelPromise1.then(v => t.equal(v, undefined, 'first cancel() call should fulfill with undefined')); + cancelPromise2.then(v => t.equal(v, undefined, 'second cancel() call should fulfill with undefined')); + t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises'); + t.notEqual(cancelPromise1, closedStreamPromise, 'cancel() promise 1 should be distinct from stream.closed'); + t.notEqual(cancelPromise1, closedReaderPromise, 'cancel() promise 1 should be distinct from reader.closed'); + t.notEqual(cancelPromise2, closedStreamPromise, 'cancel() promise 2 should be distinct from stream.closed'); + t.notEqual(cancelPromise2, closedReaderPromise, 'cancel() promise 2 should be distinct from reader.closed'); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js new file mode 100644 index 000000000..28d17acbc --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -0,0 +1,39 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('closed should fulfill with undefined', t => { + t.plan(1); + const rs = factory(); + + rs.closed.then( + v => t.equal(v, undefined, 'closed should fulfill with undefined'), + () => t.fail('closed should not reject') + ); + }); + + test('cancel() should return a distinct fulfilled promise each time', t => { + t.plan(5); + const rs = factory(); + + const cancelPromise1 = rs.cancel(); + const cancelPromise2 = rs.cancel(); + const closedPromise = rs.closed; + + cancelPromise1.then(v => t.equal(v, undefined, 'first cancel() call should fulfill with undefined')); + cancelPromise2.then(v => t.equal(v, undefined, 'second cancel() call should fulfill with undefined')); + t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises'); + t.notEqual(cancelPromise1, closedPromise, 'cancel() promise 1 should be distinct from closed'); + t.notEqual(cancelPromise2, closedPromise, 'cancel() promise 2 should be distinct from closed'); + }); + + test('getReader() should throw a TypeError', t => { + t.plan(1); + const rs = factory(); + + t.throws(() => rs.getReader(), /TypeError/, 'getReader() should fail'); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-empty-reader.js b/reference-implementation/test/templated/readable-stream-empty-reader.js new file mode 100644 index 000000000..ead5c6eca --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-empty-reader.js @@ -0,0 +1,134 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('instances have the correct methods and properties', t => { + const { reader } = factory(); + + t.ok(reader.closed, 'has a closed property'); + t.equal(typeof reader.closed.then, 'function', 'closed property is thenable'); + + t.equal(typeof reader.cancel, 'function', 'has a cancel method'); + t.equal(typeof reader.read, 'function', 'has a read method'); + t.equal(typeof reader.releaseLock, 'function', 'has a releaseLock method'); + + t.end(); + }); + + test('read() should never settle', t => { + const { reader } = factory(); + + reader.read().then( + () => t.fail('read() should not fulfill'), + () => t.fail('read() should not reject') + ); + + setTimeout(() => t.end(), 100); + }); + + test('two read()s should both never settle', t => { + const { reader } = factory(); + + reader.read().then( + () => t.fail('first read() should not fulfill'), + () => t.fail('first read() should not reject') + ); + + reader.read().then( + () => t.fail('second read() should not fulfill'), + () => t.fail('second read() should not reject') + ); + + setTimeout(() => t.end(), 100); + }); + + test('read() should return distinct promises each time', t => { + t.plan(1); + const { reader } = factory(); + + t.notEqual(reader.read(), reader.read(), 'the promises returned should be distinct'); + }); + + test('getReader() again on the stream should fail', t => { + t.plan(1); + const { stream } = factory(); + + t.throws(() => stream.getReader(), /TypeError/, 'stream.getReader() should throw a TypeError'); + }); + + test('releasing the lock with pending read requests should throw but the read requests should stay pending', t => { + const { reader } = factory(); + + reader.read().then( + () => t.fail('first read() should not fulfill'), + () => t.fail('first read() should not reject') + ); + + reader.read().then( + () => t.fail('second read() should not fulfill'), + () => t.fail('second read() should not reject') + ); + + reader.closed.then( + () => t.fail('closed should not fulfill'), + () => t.fail('closed should not reject') + ); + + t.throws(() => reader.releaseLock(), /TypeError/, 'releaseLock should throw a TypeError'); + t.equal(reader.isActive, true, 'the reader should still be active'); + + setTimeout(() => t.end(), 50); + }); + + test('releasing the lock should cause further read() calls to resolve as if the stream is closed', t => { + t.plan(3); + const { reader } = factory(); + + reader.releaseLock(); + t.equal(reader.isActive, false, 'the reader should no longer be active'); + + reader.read().then(r => + t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); + reader.read().then(r => + t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); + }); + + test('releasing the lock should cause closed to fulfill', t => { + t.plan(3); + const { stream, reader } = factory(); + + reader.closed.then(v => t.equal(v, undefined, 'reader.closed got before release should fulfill with undefined')); + stream.closed.then(() => t.fail('stream.closed got before release should not fulfill')); + + reader.releaseLock(); + t.equal(reader.isActive, false, 'the reader should no longer be active'); + + reader.closed.then(v => t.equal(v, undefined, 'reader.closed got after release should fulfill with undefined')); + stream.closed.then(() => t.fail('stream.closed got after release should not fulfill')); + }); + + test('canceling via the reader should cause the reader to become inactive', t => { + t.plan(3); + const { reader } = factory(); + + t.equal(reader.isActive, true, 'the reader should be active before releasing it'); + reader.cancel(); + t.equal(reader.isActive, false, 'the reader should no longer be active'); + reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, + 'read()ing from the reader should give a done result')) + }); + + test('canceling via the stream should cause the reader to become inactive', t => { + t.plan(3); + const { stream, reader } = factory(); + + t.equal(reader.isActive, true, 'the reader should be active before releasing it'); + stream.cancel(); + t.equal(reader.isActive, false, 'the reader should no longer be active'); + reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, + 'read()ing from the reader should give a done result')) + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-empty.js b/reference-implementation/test/templated/readable-stream-empty.js new file mode 100644 index 000000000..2e6a62833 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-empty.js @@ -0,0 +1,21 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('instances have the correct methods and properties', t => { + const rs = factory(); + + t.ok(rs.closed, 'has a closed property'); + t.equal(typeof rs.closed.then, 'function', 'closed property is thenable'); + + t.equal(typeof rs.cancel, 'function', 'has a cancel method'); + t.equal(typeof rs.getReader, 'function', 'has a getReader method'); + t.equal(typeof rs.pipeThrough, 'function', 'has a pipeThrough method'); + t.equal(typeof rs.pipeTo, 'function', 'has a pipeTo method'); + + t.end(); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-errored-reader.js b/reference-implementation/test/templated/readable-stream-errored-reader.js new file mode 100644 index 000000000..0078835e4 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-errored-reader.js @@ -0,0 +1,32 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory, error) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('closed should reject with the error', t => { + t.plan(2); + const { stream, reader } = factory(); + + stream.closed.then( + () => t.fail('stream closed should not fulfill'), + r => t.equal(r, error, 'stream closed should reject with the error') + ); + + reader.closed.then( + () => t.fail('stream closed should not fulfill'), + r => t.equal(r, error, 'stream closed should reject with the error') + ); + }); + + test('read() should reject with the error', t => { + t.plan(1); + const { reader } = factory(); + + reader.read().then( + () => t.fail('read() should not fulfill'), + r => t.equal(r, error, 'read() should reject with the error') + ); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-errored-sync-only.js b/reference-implementation/test/templated/readable-stream-errored-sync-only.js new file mode 100644 index 000000000..6380bca91 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-errored-sync-only.js @@ -0,0 +1,29 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory, error) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('cancel() should return a distinct rejected promise each time', t => { + t.plan(5); + const rs = factory(); + + const cancelPromise1 = rs.cancel(); + const cancelPromise2 = rs.cancel(); + const closedPromise = rs.closed; + + cancelPromise1.catch(e => t.equal(e, error, 'first cancel() call should reject with the error')); + cancelPromise2.catch(e => t.equal(e, error, 'second cancel() call should reject with the error')); + t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises'); + t.notEqual(cancelPromise1, closedPromise, 'cancel() promise 1 should be distinct from closed'); + t.notEqual(cancelPromise2, closedPromise, 'cancel() promise 2 should be distinct from closed'); + }); + + test('getReader() should throw the error', t => { + t.plan(1); + const rs = factory(); + + t.throwsExactly(() => rs.getReader(), error, 'getReader() should throw the error'); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js new file mode 100644 index 000000000..6ec78e857 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-errored.js @@ -0,0 +1,17 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory, error) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('closed should reject with the error', t => { + t.plan(1); + const rs = factory(); + + rs.closed.then( + () => t.fail('closed should not fulfill'), + r => t.equal(r, error, 'closed should reject with the error') + ); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js new file mode 100644 index 000000000..2f933a2a9 --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed-reader.js @@ -0,0 +1,112 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory, chunks) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('third read(), without waiting, should give { value: undefined, done: true }', t => { + t.plan(3); + + const { reader } = factory(); + + reader.read().then(r => t.deepEqual(r, { value: chunks[0], done: false }, 'first result should be correct')); + reader.read().then(r => t.deepEqual(r, { value: chunks[1], done: false }, 'second result should be correct')); + reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, 'third result should be correct')); + }); + + test('third read, with waiting, should give { value: undefined, done: true }', t => { + t.plan(3); + + const { reader } = factory(); + + reader.read().then(r => { + t.deepEqual(r, { value: chunks[0], done: false }, 'first result should be correct'); + + return reader.read().then(r => { + t.deepEqual(r, { value: chunks[1], done: false }, 'second result should be correct'); + + return reader.read().then(r => { + t.deepEqual(r, { value: undefined, done: true }, 'third result should be correct'); + }); + }); + }) + .catch(e => t.error(e)); + }); + + test('draining the stream via read() should cause the stream and reader closed promises to fulfill', t => { + t.plan(2); + + const { stream, reader } = factory(); + + stream.closed.then( + v => t.equal(v, undefined, 'stream closed should fulfill with undefined'), + () => t.fail('stream closed should not reject') + ); + + reader.closed.then( + v => t.equal(v, undefined, 'reader closed should fulfill with undefined'), + () => t.fail('reader closed should not reject') + ); + + reader.read(); + reader.read(); + }); + + test('releasing the lock after the stream is closed should do nothing', t => { + t.plan(2); + const { stream, reader } = factory(); + + stream.closed.then( + () => t.doesNotThrow(() => reader.releaseLock(), 'releasing the lock after stream closed should not throw') + ); + + reader.closed.then( + () => t.doesNotThrow(() => reader.releaseLock(), 'releasing the lock after reader closed should not throw') + ); + + reader.read(); + reader.read(); + }); + + test('releasing the lock should cause read() to act as if the stream is closed', t => { + t.plan(3); + const { reader } = factory(); + + reader.releaseLock(); + + reader.read().then(r => + t.deepEqual(r, { value: undefined, done: true }, 'first read() should return closed result')); + reader.read().then(r => + t.deepEqual(r, { value: undefined, done: true }, 'second read() should return closed result')); + reader.read().then(r => + t.deepEqual(r, { value: undefined, done: true }, 'third read() should return closed result')); + }); + + test('reader\'s closed property always returns the same promise', t => { + t.plan(6); + const { stream, reader } = factory(); + + const readerClosed = reader.closed; + + t.notEqual(readerClosed, stream.closed, 'reader.closed is not equal to stream.closed'); + t.equal(reader.closed, readerClosed, 'accessing reader.closed twice in succession gives the same value'); + + reader.read().then(() => { + t.equal(reader.closed, readerClosed, 'reader.closed is the same after read() fulfills'); + + reader.releaseLock(); + + t.equal(reader.closed, readerClosed, 'reader.closed is the same after releasing the lock'); + + stream.closed.then(() => { + t.equal(reader.closed, readerClosed, 'reader.closed is the same after the stream is closed'); + }); + + const newReader = stream.getReader(); + newReader.read(); + }); + + t.equal(reader.closed, readerClosed, 'reader.closed is the same after calling read()'); + }); +}; diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-open-reader.js b/reference-implementation/test/templated/readable-stream-two-chunks-open-reader.js new file mode 100644 index 000000000..7f40055bc --- /dev/null +++ b/reference-implementation/test/templated/readable-stream-two-chunks-open-reader.js @@ -0,0 +1,52 @@ +const tapeTest = require('tape-catch'); + +export default (label, factory, chunks) => { + function test(description, testFn) { + tapeTest(`${label}: ${description}`, testFn); + } + + test('calling read() twice without waiting will eventually give both chunks', t => { + t.plan(2); + const { reader } = factory(); + + reader.read().then(r => t.deepEqual(r, { value: chunks[0], done: false }, 'first result should be correct')); + reader.read().then(r => t.deepEqual(r, { value: chunks[1], done: false }, 'second result should be correct')); + }); + + test('calling read() twice with waiting will eventually give both chunks', t => { + t.plan(2); + const { reader } = factory(); + + reader.read().then(r => { + t.deepEqual(r, { value: chunks[0], done: false }, 'first result should be correct'); + + return reader.read().then(r => { + t.deepEqual(r, { value: chunks[1], done: false }, 'second result should be correct'); + }); + }) + .catch(e => t.error(e)); + }); + + test('read() should return distinct promises each time', t => { + t.plan(1); + const { reader } = factory(); + + t.notEqual(reader.read(), reader.read(), 'the promises returned should be distinct'); + }); + + test('cancel() after a read() should still give that single read result', t => { + t.plan(4); + const { stream, reader } = factory(); + + stream.closed.then(v => t.equal(v, undefined, 'stream closed should fulfill with undefined')); + reader.closed.then(v => t.equal(v, undefined, 'reader closed should fulfill with undefined')); + + reader.read().then(r => t.deepEqual(r, { value: chunks[0], done: false }, + 'promise returned before cancellation should fulfill with a chunk')); + + reader.cancel(); + + reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, + 'promise returned after cancellation should fulfill with an end-of-stream signal')); + }); +}; diff --git a/reference-implementation/test/transform-stream-errors.js b/reference-implementation/test/transform-stream-errors.js index 63e4cf5e5..fa0ec63eb 100644 --- a/reference-implementation/test/transform-stream-errors.js +++ b/reference-implementation/test/transform-stream-errors.js @@ -1,7 +1,7 @@ const test = require('tape-catch'); test('TransformStream errors thrown in transform put the writable and readable in an errored state', t => { - t.plan(8); + t.plan(5); const thrownError = new Error('bad things are happening!'); const ts = new TransformStream({ @@ -10,19 +10,9 @@ test('TransformStream errors thrown in transform put the writable and readable i } }); - t.equal(ts.readable.state, 'readable', 'readable starts in readable'); t.equal(ts.writable.state, 'writable', 'writable starts in writable'); - ts.writable.write('a'); - - t.equal(ts.writable.state, 'waiting', 'writable becomes waiting immediately after throw'); - - setTimeout(() => { - t.equal(ts.readable.state, 'errored', 'readable becomes errored after writing to the throwing transform'); - t.equal(ts.writable.state, 'errored', 'writable becomes errored after writing to the throwing transform'); - }, 0); - - ts.readable.read().then( + ts.readable.getReader().read().then( () => t.fail('readable\'s read() should reject'), r => t.equal(r, thrownError, 'readable\'s read should reject with the thrown error') ); @@ -36,10 +26,13 @@ test('TransformStream errors thrown in transform put the writable and readable i () => t.fail('writable\'s closed should not be fulfilled'), e => t.equal(e, thrownError, 'writable\'s closed should be rejected with the thrown error') ); + + ts.writable.write('a'); + t.equal(ts.writable.state, 'waiting', 'writable becomes waiting immediately after throw'); }); test('TransformStream errors thrown in flush put the writable and readable in an errored state', t => { - t.plan(11); + t.plan(6); const thrownError = new Error('bad things are happening!'); const ts = new TransformStream({ @@ -51,25 +44,7 @@ test('TransformStream errors thrown in flush put the writable and readable in an } }); - t.equal(ts.readable.state, 'readable', 'readable starts in readable'); - t.equal(ts.writable.state, 'writable', 'writable starts in writable'); - - ts.writable.write('a'); - - t.equal(ts.readable.state, 'readable', 'readable stays in waiting after a write'); - t.equal(ts.writable.state, 'waiting', 'writable becomes waiting after a write'); - - ts.writable.close(); - - t.equal(ts.readable.state, 'readable', 'readable stays in readable after the close call'); - t.equal(ts.writable.state, 'closing', 'writable becomes closing after the close call'); - - setTimeout(() => { - t.equal(ts.readable.state, 'errored', 'readable becomes errored after closing with the throwing flush'); - t.equal(ts.writable.state, 'errored', 'writable becomes errored after closing with the throwing flush'); - }, 0); - - ts.readable.read().then( + ts.readable.getReader().read().then( () => t.fail('readable\'s read() should reject'), r => t.equal(r, thrownError, 'readable\'s read should reject with the thrown error') ); @@ -83,4 +58,10 @@ test('TransformStream errors thrown in flush put the writable and readable in an () => t.fail('writable\'s closed should not be fulfilled'), e => t.equal(e, thrownError, 'writable\'s closed should be rejected with the thrown error') ); + + t.equal(ts.writable.state, 'writable', 'writable starts in writable'); + ts.writable.write('a'); + t.equal(ts.writable.state, 'waiting', 'writable becomes waiting after a write'); + ts.writable.close(); + t.equal(ts.writable.state, 'closing', 'writable becomes closing after the close call'); }); diff --git a/reference-implementation/test/transform-stream.js b/reference-implementation/test/transform-stream.js index 4a75bce8a..837ff41a0 100644 --- a/reference-implementation/test/transform-stream.js +++ b/reference-implementation/test/transform-stream.js @@ -24,12 +24,11 @@ test('TransformStream instances must have writable and readable properties of th t.ok(ts.readable instanceof ReadableStream, 'readable is an instance of ReadableStream'); }); -test('TransformStream writables and readables start in the expected states', t => { - t.plan(2); +test('TransformStream writable starts in the writable state', t => { + t.plan(1); const ts = new TransformStream({ transform() { } }); t.equal(ts.writable.state, 'writable', 'writable starts writable'); - t.equal(ts.readable.state, 'readable', 'readable starts readable'); }); test('Pass-through sync TransformStream: can read from readable what is put into writable', t => { @@ -45,8 +44,10 @@ test('Pass-through sync TransformStream: can read from readable what is put into ts.writable.write('a'); t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); - ts.readable.read().then(chunk => { - t.equal(chunk, 'a', 'result from reading the readable is the same as was written to writable'); + ts.readable.getReader().read().then(result => { + t.deepEqual(result, { value: 'a', done: false }, + 'result from reading the readable is the same as was written to writable'); + return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }); @@ -68,8 +69,10 @@ test('Uppercaser sync TransformStream: can read from readable transformed versio t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); - ts.readable.read().then(chunk => { - t.equal(chunk, 'A', 'result from reading the readable is the transformation of what was written to writable'); + ts.readable.getReader().read().then(result => { + t.deepEqual(result, { value: 'A', done: false }, + 'result from reading the readable is the transformation of what was written to writable'); + return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }); @@ -92,11 +95,15 @@ test('Uppercaser-doubler sync TransformStream: can read both chunks put into the t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); - ts.readable.read().then(chunk1 => { - t.equal(chunk1, 'A', 'the first chunk read is the transformation of the single chunk written'); + const reader = ts.readable.getReader(); + + reader.read().then(result1 => { + t.deepEqual(result1, { value: 'A', done: false }, + 'the first chunk read is the transformation of the single chunk written'); - return ts.readable.read().then(chunk2 => { - t.equal(chunk2, 'A', 'the second chunk read is also the transformation of the single chunk written'); + return reader.read().then(result2 => { + t.deepEqual(result2, { value: 'A', done: false }, + 'the second chunk read is also the transformation of the single chunk written'); return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); @@ -120,8 +127,10 @@ test('Uppercaser async TransformStream: can read from readable transformed versi t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); - ts.readable.read().then(chunk => { - t.equal(chunk, 'A', 'result from reading the readable is the transformation of what was written to writable'); + ts.readable.getReader().read().then(result => { + t.deepEqual(result, { value: 'A', done: false }, + 'result from reading the readable is the transformation of what was written to writable'); + return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }); @@ -140,14 +149,18 @@ test('Uppercaser-doubler async TransformStream: can read both chunks put into th } }); + const reader = ts.readable.getReader(); + ts.writable.write('a'); t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); - ts.readable.read().then(chunk1 => { - t.equal(chunk1, 'A', 'the first chunk read is the transformation of the single chunk written'); + reader.read().then(result1 => { + t.deepEqual(result1, { value: 'A', done: false }, + 'the first chunk read is the transformation of the single chunk written'); - return ts.readable.read().then(chunk2 => { - t.equal(chunk2, 'A', 'the second chunk read is also the transformation of the single chunk written'); + return reader.read().then(result2 => { + t.deepEqual(result2, { value: 'A', done: false }, + 'the second chunk read is also the transformation of the single chunk written'); return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); @@ -158,21 +171,18 @@ test('Uppercaser-doubler async TransformStream: can read both chunks put into th }); test('TransformStream: by default, closing the writable closes the readable (when there are no queued writes)', t => { - t.plan(4); + t.plan(3); const ts = new TransformStream({ transform() { } }); ts.writable.close(); t.equal(ts.writable.state, 'closing', 'writable is closing'); - setTimeout(() => { - t.equal(ts.readable.state, 'closed', 'readable is closed within a tick'); - ts.writable.closed.then(() => { - t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); - t.equal(ts.readable.state, 'closed', 'readable is still closed at that time'); - }) - .catch(t.error); - }, 0); + Promise.all([ts.writable.closed, ts.readable.closed]).then(() => { + t.pass('both writable and readable closed promises fulfill'); + t.equal(ts.writable.state, 'closed', 'writable state becomes closed eventually'); + }) + .catch(e => t.error(e)); }); test('TransformStream: by default, closing the writable waits for transforms to finish before closing both', t => { @@ -187,19 +197,25 @@ test('TransformStream: by default, closing the writable waits for transforms to ts.writable.write('a'); ts.writable.close(); t.equal(ts.writable.state, 'closing', 'writable is closing'); + + let rsClosed = false; + ts.readable.closed.then(() => { + rsClosed = true; + }); + setTimeout(() => { - t.equal(ts.readable.state, 'readable', 'readable is still readable after a tick'); + t.equal(rsClosed, false, 'readable is not closed after a tick'); ts.writable.closed.then(() => { t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); - t.equal(ts.readable.state, 'closed', 'readable is closed at that point'); + t.equal(rsClosed, true, 'readable is closed at that point'); }) - .catch(t.error); + .catch(e => t.error(e)); }, 0); }); test('TransformStream: by default, closing the writable closes the readable after sync enqueues and async done', t => { - t.plan(6); + t.plan(3); const ts = new TransformStream({ transform(chunk, enqueue, done) { @@ -212,23 +228,19 @@ test('TransformStream: by default, closing the writable closes the readable afte ts.writable.write('a'); ts.writable.close(); t.equal(ts.writable.state, 'closing', 'writable is closing'); - t.equal(ts.readable.state, 'readable', 'readable is readable'); ts.writable.closed.then(() => { t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); - t.equal(ts.readable.state, 'readable', 'readable is still readable at that time'); return readableStreamToArray(ts.readable).then(chunks => { - t.deepEquals(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); - - t.equal(ts.readable.state, 'closed', 'after reading, the readable is now closed'); + t.deepEqual(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); }); }) - .catch(t.error); + .catch(e => t.error(e)); }); test('TransformStream: by default, closing the writable closes the readable after async enqueues and async done', t => { - t.plan(6); + t.plan(3); const ts = new TransformStream({ transform(chunk, enqueue, done) { @@ -241,19 +253,15 @@ test('TransformStream: by default, closing the writable closes the readable afte ts.writable.write('a'); ts.writable.close(); t.equal(ts.writable.state, 'closing', 'writable is closing'); - t.equal(ts.readable.state, 'readable', 'readable is readable'); ts.writable.closed.then(() => { t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); - t.equal(ts.readable.state, 'readable', 'readable is still readable at that time'); return readableStreamToArray(ts.readable).then(chunks => { - t.deepEquals(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); - - t.equal(ts.readable.state, 'closed', 'after reading, the readable is now closed'); + t.deepEqual(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); }); }) - .catch(t.error); + .catch(e => t.error(e)); }); test('TransformStream flush is called immediately when the writable is closed, if no writes are queued', t => { @@ -289,9 +297,14 @@ test('TransformStream flush is called after all queued writes finish, once the w ts.writable.close(); t.notOk(flushCalled, 'closing the writable does not immediately call flush if writes are not finished'); + let rsClosed = false; + ts.readable.closed.then(() => { + rsClosed = true; + }); + setTimeout(() => { t.ok(flushCalled, 'flush is eventually called'); - t.equal(ts.readable.state, 'readable', 'if flush does not call close, the readable stays readable'); + t.equal(rsClosed, false, 'if flush does not call close, the readable does not become closed'); }, 50); }); @@ -308,16 +321,18 @@ test('TransformStream flush gets a chance to enqueue more into the readable', t } }); + const reader = ts.readable.getReader(); + ts.writable.write('a'); ts.writable.close(); - ts.readable.read().then(chunk1 => { - t.equal(chunk1, 'x', 'the first chunk read is the transformation of the single chunk written'); + reader.read().then(result1 => { + t.deepEqual(result1, { value: 'x', done: false }, 'the first chunk read is the first one enqueued in flush'); - return ts.readable.read().then(chunk2 => { - t.equal(chunk2, 'y', 'the second chunk read is also the transformation of the single chunk written'); + return reader.read().then(result2 => { + t.deepEqual(result2, { value: 'y', done: false }, 'the second chunk read is the second one enqueued in flush'); }); }) - .catch(t.error); + .catch(e => t.error(e)); }); test('TransformStream flush gets a chance to enqueue more into the readable, and can then async close', t => { @@ -334,19 +349,21 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and } }); + const reader = ts.readable.getReader(); + ts.writable.write('a'); ts.writable.close(); - ts.readable.read().then(chunk1 => { - t.equal(chunk1, 'x', 'the first chunk read is the transformation of the single chunk written'); + reader.read().then(result1 => { + t.deepEqual(result1, { value: 'x', done: false }, 'the first chunk read is the first one enqueued in flush'); - return ts.readable.read().then(chunk2 => { - t.equal(chunk2, 'y', 'the second chunk read is also the transformation of the single chunk written'); + return reader.read().then(result2 => { + t.deepEqual(result2, { value: 'y', done: false }, 'the second chunk read is the second one enqueued in flush'); }); }) - .catch(t.error); + .catch(e => t.error(e)); ts.readable.closed.then(() => { - t.equal(ts.readable.state, 'closed', 'the readable eventually does close, after close is called from flush'); + t.pass('readable becomes closed'); }) - .catch(t.error); + .catch(e => t.error(e)); }); diff --git a/reference-implementation/test/utils/readable-stream-to-array.js b/reference-implementation/test/utils/readable-stream-to-array.js index d8eba913c..7c8b3d3db 100644 --- a/reference-implementation/test/utils/readable-stream-to-array.js +++ b/reference-implementation/test/utils/readable-stream-to-array.js @@ -1,15 +1,17 @@ export default function readableStreamToArray(readable) { const chunks = []; + const reader = readable.getReader(); return pump(); function pump() { - return readable.read().then(chunk => { - if (chunk === ReadableStream.EOS) { + return reader.read().then(({ value, done }) => { + if (done) { + reader.releaseLock(); return chunks; } - chunks.push(chunk); + chunks.push(value); return pump(); }); }