diff --git a/Locking Design Doc.md b/Locking Design Doc.md new file mode 100644 index 000000000..7159754ea --- /dev/null +++ b/Locking Design Doc.md @@ -0,0 +1,108 @@ +# Locking a Stream for Exclusive Reading + +In [#241](https://github.com/whatwg/streams/issues/241) we had a great conversation about the need for being able to "lock" a stream for exclusive use. This would be done implicitly while piping, but could also be useful for building user-facing abstractions, as we'll see below. + +What emerged was the idea of a "stream reader," which has most of the readable stream interface, but while it exists you cannot read from the stream except through that reader. + +This document represents some formative rationales for the design of the reader concept, approached from the perspective of a developer that uses increasingly complex features of the streams ecosystem. + +## Developer usage + +### Level 0: no reader usage + +If the developer knows nothing about readers, they can continue using the stream just fine. + +- `read()`, `state`, and `ready` all behave as they do now if used without `pipeTo`. +- `pipeTo` will cause the following side effects: + - `read()` will throw an informative error + - `state` will return `"waiting"` until the pipe completes (successfully or otherwise) + - `ready` will return a promise that remains pending until the pipe completes + +### Level 1: using readers directly + +The developer might want to create their own abstractions that require exclusive access to the stream. For example, a read-to-end function would probably want to avoid others being able to call `.read()` in the middle. + +Example code: + +```js +function readAsJson(rs) { + var string = ""; + var reader = rs.getReader(); + + pump(); + + // These lines would be simpler with `Promise.prototype.finally` (or async functions). + return reader.closed.then( + () => { + reader.releaseLock(); + return JSON.parse(string); + }, + e => { + reader.releaseLock(); + throw e; + } + ); + + function pump() { + while (reader.state === "readable") { + string += reader.read(); + } + if (reader.state === "waiting") { + reader.ready.then(pump); + } + } +} +``` + +The stream would have the same behaviors after being passed to `readAsJson` that it would have after calling its `pipeTo` method. + +The reader should have all of the non-piping-related public interface of the stream. This includes: + +- `closed` getter, which is a pass-through +- `state` and `ready` getters, which reveal the "true" state and state transitions of the stream which the stream itself no longer reveals +- `read()` method, which has the same behavior as that of the stream's except that it works while the stream is locked +- `cancel()` method, which first calls `this.releaseLock()` before the pass-through + +While a stream is locked, it is indistinguishable from a stream that has been drained of all chunks and is not getting any more enqueued. We could consider adding some kind of test, like `stream.isLocked`, to distinguish. However, it's not clear there's a compelling reason for doing so (let us know if so?), and the indistinguishability is kind of a nice property from the perspective of the principle of least authority. + +For readers, you should be able to tell if they're still active (i.e. have not been released) via `reader.isActive`. + +### Level 2: subclassers of `ReadableStream` + +Subclasses of `ReadableStream` should get locking support "for free." The same mechanisms for acquiring and using a lock should work flawlessly. More interestingly, if they wanted to support modifying the behavior of e.g. `read()` (or `state` or `ready` or `closed`), they should only have to override it in one location. + +Which location is more friendly? Probably in `ReadableStream`, so that `ExclusiveStreamReader` still works for `ReadableStream` subclasses. Less work. + +This means `ExclusiveStreamReader` should delegate to `ReadableStream`, and not the other way around. + +### Level 3: custom readable stream implementations? + +It is unclear whether this is necessary, but up until now we have a high level of support for anyone who wants to re-implement the entire `ReadableStream` interface with their own specific code. For example, if you implement `state`, `ready`, `closed`, `read()`, and `cancel()`, you can do `myCustomStream.pipeTo = ReadableStream.prototype.pipeTo` and it will continue to work. + +If we encourage this kind of thing, we should make it easy for custom readable streams to be lockable as well. That basically means `ExclusiveStreamReader` should not require knowledge of `ReadableStream`'s internal slots. + +We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getLock, setLock })` or similar. + +## Optimizability + +The need to support subclassing, via `ExclusiveStreamReader` delegating to the `ReadableStream` implementation, conflicts a bit with the desire for readers to be fast. However, this can be fixed with some cleverness. + +The spec semantics for e.g. `reader.read()` are essentially: + +- Check that `reader@[[stream]]` is locked to `reader`. +- Unlock `reader@[[stream]]`. +- Try `return reader@[[stream]].read()`; finally re-lock `reader@[[stream]]`. + +This will ensure that if `reader@[[stream]]` is a subclass of `ReadableStream`, it will polymorphically dispatch to the subclass's `read` method. However, this kind of try/finally pattern is not very optimizable in V8. + +Here is an optimization that can be performed instead, with slight tweaks to both `ReadableStream.prototype.read` and `ExclusiveStreamReader.prototype.read`: + +- Define `ReadableStream.prototype.read` as: + - Check that `this` is not locked. + - Return `ReadFromReadableStream(this)`. (That is, extract the main functionality, without the check, into its own function.) +- Define `ExclusiveStreamReader.prototype.read` like so: + - Check that `this@[[stream]]` is locked to `this`. + - If `this@[[stream]].read` is equal to the original `ReadableStream.prototype.read`: return `ReadFromReadableStream(this@[[stream]])`. + - Otherwise, proceed via the per-spec semantics above. + +This essentially ensures that all undisturbed readable streams, or readable stream subclasses that do not override `read`, go down the "fast path" by ignoring all the try/finally and lock/unlock business. It is unobservable, since we have checked that `read` has not been modified in any way. diff --git a/index.bs b/index.bs index 2debb2763..0037a3aef 100644 --- a/index.bs +++ b/index.bs @@ -148,6 +148,22 @@ based the total size of all chunks in the stream's internal queue. backpressure signal. +

Locking

+ + + +An exclusive stream reader or simply reader is an object that encapsulates a readable stream, +preventing access to the stream except through the reader's interface. We say in this case the stream is +locked to the reader, and that the reader is +active. + +The reader presents most of the stream's interface, but while it is active, only the reader's methods and properties +can be used to successfully manipulate and interrogate the state of the stream; when the stream is used directly, it +appears as if it is empty. + +A reader also has the capability to release its read lock, which makes it no +longer active. At this point the original stream can be used as before, and the reader becomes inert. +

Readable Streams

Introduction to Readable Streams

@@ -376,6 +392,7 @@ would look like get state() cancel(reason) + getReader() pipeThrough({ writable, readable }, options) pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) read() @@ -433,6 +450,10 @@ Instances of ReadableStream are created with the internal slots des \[[queue]] A List representing the stream's internal queue of chunks + + \[[reader]] + A ExclusiveStreamReader instance, if the stream is locked to an exclusive reader, or + undefined if it is not \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -493,6 +514,7 @@ Instances of ReadableStream are created with the internal slots des
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "waiting".
  • Set this@\[[started]], this@\[[draining]], and this@\[[pulling]] to false. +
  • Set this@\[[reader]] to undefined.
  • Set this@\[[enqueue]] to CreateReadableStreamEnqueueFunction(this).
  • Set this@\[[close]] to CreateReadableStreamCloseFunction(this).
  • Set this@\[[error]] to CreateReadableStreamErrorFunction(this). @@ -532,6 +554,7 @@ Instances of ReadableStream are created with the internal slots des
      +
    1. If this@\[[reader]] is not undefined, return this@\[[reader]]@\[[lockReleased]].
    2. Return this@\[[readyPromise]].
    @@ -553,9 +576,12 @@ Instances of ReadableStream are created with the internal slots des
    "errored"
    An error occurred interacting with the underlying source, and so the stream is now dead. + + If the stream is locked to a reader, the stream will appear to be "waiting".
      +
    1. If this@\[[reader]] is not undefined, return "waiting".
    2. Return this@\[[state]].
    @@ -565,19 +591,44 @@ Instances of ReadableStream are created with the internal slots des The cancel method signals a loss of interest in the stream by a consumer. Calling it will immediately move the stream to a "closed" state, throwing away any queued data, as well as executing any cancellation mechanism of the underlying source. + + Readable streams cannot be cancelled while locked to a reader; this method will return a rejected promise.
      +
    1. If this@\[[reader]] is not undefined, return a new promise rejected with a TypeError.
    2. If this@\[[state]] is "closed", return a new promise resolved with undefined.
    3. If this@\[[state]] is "errored", return a new promise rejected with this@\[[storedError]].
    4. If this@\[[state]] is "waiting", resolve this@\[[readyPromise]] with undefined.
    5. Let this@\[[queue]] be a new empty List. -
    6. Set this@\[[state]] to "closed". -
    7. Resolve this@\[[closedPromise]] with undefined. +
    8. Call-with-rethrow CloseReadableStream(this).
    9. Let sourceCancelPromise be the result of promise-calling this@\[[onCancel]](reason).
    10. Return the result of transforming sourceCancelPromise by a fulfillment handler that returns undefined.
    +
    getReader()
    + +
    + The getReader method creates an exclusive stream reader and + locks the stream to the the new reader. While the stream is locked, it cannot be + manipulated directly, and will appear to be an inert, empty stream waiting for new chunks to be enqueued. + Instead, the returned reader object can be used to read from or cancel the stream, or to discern its state and state + transitions. If or when the lock is released, the stream can be used again as + normal. + + This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its + entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours, interfering + with your abstraction or observing its side-effects. + + Note that when a stream is closed or errors, any reader it is locked to is automatically released. +
    + +
      +
    1. If this@\[[state]] is "closed", throw a TypeError exception. +
    2. If this@\[[state]] is "errored", throw this@\[[storedError]]. +
    3. Return Construct(ExclusiveStreamReader, (this)). +
    +
    pipeThrough({ writable, readable }, options)
    @@ -621,6 +672,7 @@ look for the pipeTo method.
      +
    1. If this@\[[reader]] is not undefined, throw a TypeError exception.
    2. If this@\[[state]] is "waiting" or "closed", throw a TypeError exception.
    3. If this@\[[state]] is "errored", throw this@\[[storedError]].
    4. Assert: this@\[[state]] is "readable". @@ -628,11 +680,7 @@ look for the pipeTo method.
    5. Let chunk be DequeueValue(this@\[[queue]]).
    6. If this@\[[queue]] is now empty,
        -
      1. If this@\[[draining]] is true, -
          -
        1. Set this@\[[state]] to "closed". -
        2. Resolve this@\[[closedPromise]] with undefined. -
        +
      2. If this@\[[draining]] is true, call-with-rethrow CloseReadableStream(this).
      3. If this@\[[draining]] is false,
        1. Set this@\[[state]] to "waiting". @@ -643,6 +691,76 @@ look for the pipeTo method.
        2. Return chunk.
        +

        Class ExclusiveStreamReader

        + +

        Class Definition

        + +This section is non-normative. + +If one were to write the ExclusiveStreamReader class in something close to the syntax of [[!ECMASCRIPT]], +it would look like + +
        
        +  class ExclusiveStreamReader {
        +    constructor(stream)
        +
        +    get closed()
        +    get isActive()
        +    get ready()
        +    get state()
        +
        +    cancel(reason, ...args)
        +    read(...args)
        +    releaseLock()
        +  }
        +
        + +

        Internal Slots

        + +Instances of ExclusiveStreamReader are created with the internal slots described in the following table: + + + + + + + + + + + + +
        Internal SlotDescription (non-normative)
        \[[stream]] + A ReadableStream instance that this reader is able to read from +
        \[[lockReleased]] + A promise that becomes fulfilled when the reader releases its lock on the stream +
        + +

        new ExclusiveStreamReader(stream)

        + +
          +
        1. If stream does not have a \[[reader]] internal slot, throw a TypeError exception. +
        2. If stream@\[[reader]] is not undefined, throw a TypeError exception. +
        3. Set stream@\[[reader]] to this. +
        4. Set this@\[[stream]] to stream. +
        5. Set this@\[[lockReleased]] to a new promise. +
        + +

        Properties of the ExclusiveStreamReader Prototype

        + +
        get closed
        + +
        get ready
        + +
        get state
        + +
        cancel(reason, ...args)
        + +
        read(...args)
        + +
        releaseLock
        + +

        Readable Stream Abstract Operations

        CallReadableStreamPull ( stream )

        @@ -664,6 +782,15 @@ look for the pipeTo method.
      4. Otherwise, return undefined.
      +

      CloseReadableStream ( stream )

      + +
        +
      1. Set stream@\[[state]] to "closed". +
      2. Resolve stream@\[[closedPromise]] with undefined. +
      3. If stream@\[[reader]] is not undefined, call-with-rethrow Invoke(stream@\[[reader]], "releaseLock"). +
      4. Return undefined. +
      +

      CreateReadableStreamCloseFunction ( stream )

        @@ -677,8 +804,7 @@ A Readable Stream Close Function is a built-in anonymous function of
      1. If stream@\[[state]] is "waiting",
        1. Resolve stream@\[[readyPromise]] with undefined. -
        2. Resolve stream@\[[closedPromise]] with undefined. -
        3. Set stream@\[[state]] to "closed". +
        4. Return CloseReadableStream(this).
      2. If stream@\[[state]] is "readable",
          @@ -728,19 +854,14 @@ A Readable Stream Error Function is a built-in anonymous function of a variable stream, that performs the following steps:
            -
          1. If stream@\[[state]] is "waiting", -
              -
            1. Set stream@\[[state]] to "errored". -
            2. Set stream@\[[storedError]] to e. -
            3. Resolve stream@\[[readyPromise]] with undefined. -
            4. Reject stream@\[[closedPromise]] with e. -
            -
          2. If stream@\[[state]] is "readable", +
          3. If stream@\[[state]] is "waiting", resolve stream@\[[readyPromise]] with undefined. +
          4. If stream@\[[state]] is "readable", let stream@\[[queue]] be a new empty List. +
          5. If stream@\[[state]] is either "waiting" or "readable",
              -
            1. Let stream@\[[queue]] be a new empty List.
            2. Set stream@\[[state]] to "errored".
            3. Set stream@\[[storedError]] to e.
            4. Reject stream@\[[closedPromise]] with e. +
            5. If stream@\[[reader]] is not undefined, call-with-rethrow Invoke(stream@\[[reader]], "releaseLock").
          diff --git a/reference-implementation/lib/exclusive-stream-reader.js b/reference-implementation/lib/exclusive-stream-reader.js new file mode 100644 index 000000000..52b508930 --- /dev/null +++ b/reference-implementation/lib/exclusive-stream-reader.js @@ -0,0 +1,84 @@ +var assert = require('assert'); + +export default class ExclusiveStreamReader { + constructor(stream) { + if (!('_reader' in stream)) { + throw new TypeError('ExclusiveStreamReader can only be used with ReadableStream objects or subclasses'); + } + + if (stream._reader !== undefined) { + throw new TypeError('This stream has already been locked for exclusive reading by another reader'); + } + + stream._reader = this; + + this._stream = stream; + + this._lockReleased = new Promise(resolve => { + this._lockReleased_resolve = resolve; + }); + } + + get ready() { + if (this._stream._reader !== this) { + return this._stream.ready; + } + + this._stream._reader = undefined; + try { + return this._stream.ready; + } finally { + this._stream._reader = this; + } + } + + get state() { + if (this._stream._reader !== this) { + return this._stream.state; + } + + this._stream._reader = undefined; + try { + return this._stream.state; + } finally { + this._stream._reader = this; + } + } + + get closed() { + return this._stream.closed; + } + + get isActive() { + return this._stream._reader === this; + } + + read(...args) { + if (this._stream._reader !== this) { + throw new TypeError('This stream reader has released its lock on the stream and can no longer be used'); + } + + this._stream._reader = undefined; + try { + return this._stream.read(...args); + } finally { + this._stream._reader = this; + } + } + + cancel(reason, ...args) { + if (this._stream._reader !== this) { + return Promise.reject( + new TypeError('This stream reader has released its lock on the stream and can no longer be used')); + } + + var stream = this._stream; + this.releaseLock(); + return stream.cancel(reason, ...args); + } + + releaseLock() { + this._stream._reader = undefined; + this._lockReleased_resolve(undefined); + } +} diff --git a/reference-implementation/lib/experimental/readable-byte-stream.js b/reference-implementation/lib/experimental/readable-byte-stream.js index 7696181b1..0e4ac4acf 100644 --- a/reference-implementation/lib/experimental/readable-byte-stream.js +++ b/reference-implementation/lib/experimental/readable-byte-stream.js @@ -57,6 +57,7 @@ export default class ReadableByteStream { } } + this._reader = undefined; this._state = 'waiting'; this._onReadInto = readInto; @@ -80,6 +81,10 @@ export default class ReadableByteStream { } get state() { + if (this._reader !== undefined) { + return 'waiting'; + } + return this._state; } @@ -169,14 +174,11 @@ export default class ReadableByteStream { return resizedArrayBuffer; } - // Note: We plan to make this more efficient in the future. But for now this - // implementation suffices to show interoperability with a generic - // WritableStream. - pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { - ReadableStream.prototype.pipeTo.call(this, dest, {preventClose, preventAbort, preventCancel}); - } - get ready() { + if (this._reader !== undefined) { + return this._reader._lockReleased; + } + return this._readyPromise; } @@ -192,6 +194,7 @@ export default class ReadableByteStream { } this._state = 'closed'; + this._reader = undefined; this._resolveClosedPromise(undefined); return new Promise((resolve, reject) => { @@ -208,6 +211,10 @@ export default class ReadableByteStream { } get closed() { + if (this._reader !== undefined) { + return this._reader._lockReleased.then(() => this._closedPromise); + } + return this._closedPromise; } @@ -223,3 +230,12 @@ export default class ReadableByteStream { this._closedPromise_reject = null; } } + +// Note: We plan to make this more efficient in the future. But for now this +// implementation suffices to show interoperability with a generic +// WritableStream. +ReadableByteStream.prototype.pipeTo = ReadableStream.prototype.pipeTo; + +// These can be direct copies. Per spec though they probably should not be === since that might preclude optimizations. +ReadableByteStream.prototype.pipeThrough = ReadableStream.prototype.pipeThrough; +ReadableByteStream.prototype.getReader = ReadableStream.prototype.getReader; diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 504559b8c..9657f385a 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -2,6 +2,7 @@ var assert = require('assert'); import * as helpers from './helpers'; import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes'; import CountQueuingStrategy from './count-queuing-strategy'; +import ExclusiveStreamReader from './exclusive-stream-reader'; export default class ReadableStream { constructor({ @@ -30,6 +31,7 @@ export default class ReadableStream { this._started = false; this._draining = false; this._pulling = false; + this._reader = undefined; this._enqueue = CreateReadableStreamEnqueueFunction(this); this._close = CreateReadableStreamCloseFunction(this); @@ -50,10 +52,19 @@ export default class ReadableStream { } get state() { + if (this._reader !== undefined) { + return 'waiting'; + } + return this._state; } cancel(reason) { + if (this._reader !== undefined) { + return Promise.reject( + new TypeError('This stream is locked to a single exclusive reader and cannot be cancelled directly')); + } + if (this._state === 'closed') { return Promise.resolve(undefined); } @@ -65,13 +76,23 @@ export default class ReadableStream { } this._queue = []; - this._state = 'closed'; - this._resolveClosedPromise(undefined); + CloseReadableStream(this); var sourceCancelPromise = helpers.promiseCall(this._onCancel, reason); return sourceCancelPromise.then(() => undefined); } + getReader() { + if (this._state === 'closed') { + throw new TypeError('The stream has already been closed, so a reader cannot be acquired.'); + } + if (this._state === 'errored') { + throw this._storedError; + } + + return new ExclusiveStreamReader(this); + } + pipeThrough({ writable, readable }, options) { if (!helpers.typeIsObject(writable)) { throw new TypeError('A transform stream must have an writable property that is an object.'); @@ -86,11 +107,11 @@ export default class ReadableStream { } pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { - var source = this; preventClose = Boolean(preventClose); preventAbort = Boolean(preventAbort); preventCancel = Boolean(preventCancel); + var source; var resolvePipeToPromise; var rejectPipeToPromise; @@ -98,6 +119,7 @@ export default class ReadableStream { resolvePipeToPromise = resolve; rejectPipeToPromise = reject; + source = this.getReader(); doPipe(); }); @@ -137,12 +159,16 @@ export default class ReadableStream { function cancelSource(reason) { if (preventCancel === false) { + // implicitly releases the lock source.cancel(reason); + } else { + source.releaseLock(); } rejectPipeToPromise(reason); } function closeDest() { + source.releaseLock(); if (preventClose === false) { dest.close().then(resolvePipeToPromise, rejectPipeToPromise); } else { @@ -151,6 +177,7 @@ export default class ReadableStream { } function abortDest(reason) { + source.releaseLock(); if (preventAbort === false) { dest.abort(reason); } @@ -159,6 +186,10 @@ export default class ReadableStream { } read() { + if (this._reader !== undefined) { + throw new TypeError('This stream is locked to a single exclusive reader and cannot be read from directly'); + } + if (this._state === 'waiting') { throw new TypeError('no chunks available (yet)'); } @@ -176,8 +207,7 @@ export default class ReadableStream { if (this._queue.length === 0) { if (this._draining === true) { - this._state = 'closed'; - this._resolveClosedPromise(undefined); + CloseReadableStream(this); } else { this._state = 'waiting'; this._initReadyPromise(); @@ -190,6 +220,10 @@ export default class ReadableStream { } get ready() { + if (this._reader !== undefined) { + return this._reader._lockReleased; + } + return this._readyPromise; } @@ -261,8 +295,7 @@ function CreateReadableStreamCloseFunction(stream) { return () => { if (stream._state === 'waiting') { stream._resolveReadyPromise(undefined); - stream._resolveClosedPromise(undefined); - stream._state = 'closed'; + return CloseReadableStream(stream); } if (stream._state === 'readable') { stream._draining = true; @@ -312,16 +345,18 @@ function CreateReadableStreamEnqueueFunction(stream) { function CreateReadableStreamErrorFunction(stream) { return e => { if (stream._state === 'waiting') { - stream._state = 'errored'; - stream._storedError = e; stream._resolveReadyPromise(undefined); - stream._rejectClosedPromise(e); } - else if (stream._state === 'readable') { + if (stream._state === 'readable') { stream._queue = []; + } + if (stream._state === 'waiting' || stream._state === 'readable') { stream._state = 'errored'; stream._storedError = e; stream._rejectClosedPromise(e); + if (stream._reader !== undefined) { + stream._reader.releaseLock(); + } } }; } @@ -339,6 +374,17 @@ function ShouldReadableStreamApplyBackpressure(stream) { return shouldApplyBackpressure; } +function CloseReadableStream(stream) { + stream._state = 'closed'; + stream._resolveClosedPromise(undefined); + + if (stream._reader !== undefined) { + stream._reader.releaseLock(); + } + + return undefined; +} + var defaultReadableStreamStrategy = { shouldApplyBackpressure(queueSize) { assert(typeof queueSize === 'number' && !Number.isNaN(queueSize)); diff --git a/reference-implementation/test/exclusive-stream-reader.js b/reference-implementation/test/exclusive-stream-reader.js new file mode 100644 index 000000000..0d961a694 --- /dev/null +++ b/reference-implementation/test/exclusive-stream-reader.js @@ -0,0 +1,361 @@ +var test = require('tape'); + +import ReadableStream from '../lib/readable-stream'; + +test('Using the reader directly on a mundane stream', t => { + t.plan(22); + + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + setTimeout(() => enqueue('b'), 30); + setTimeout(close, 60); + } + }); + + t.equal(rs.state, 'readable', 'stream starts out readable'); + + var reader = rs.getReader(); + + t.equal(reader.isActive, true, 'reader isActive is true'); + + t.equal(rs.state, 'waiting', 'after getting a reader, the stream state is waiting'); + t.equal(reader.state, 'readable', 'the reader state is readable'); + + t.throws(() => rs.read(), /TypeError/, 'trying to read from the stream directly throws a TypeError'); + t.equal(reader.read(), 'a', 'trying to read from the reader works and gives back the first enqueued value'); + t.equal(reader.state, 'waiting', 'the reader state is now waiting since the queue has been drained'); + rs.cancel().then( + () => t.fail('cancel() should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'cancel() should be rejected with a TypeError') + ); + + reader.ready.then(() => { + t.equal(reader.state, 'readable', 'ready for reader is fulfilled when second chunk is enqueued'); + t.equal(rs.state, 'waiting', 'the stream state is still waiting'); + t.equal(reader.read(), 'b', 'you can read the second chunk from the reader'); + }); + + reader.closed.then(() => { + t.pass('closed for the reader is fulfilled'); + t.equal(reader.state, 'closed', 'the reader state is closed'); + t.equal(rs.state, 'closed', 'the stream state is closed'); + t.equal(reader.isActive, false, 'the reader is no longer active'); + + t.doesNotThrow(() => reader.releaseLock(), 'trying to release the lock twice does nothing'); + }); + + rs.ready.then(() => { + t.equal(rs.state, 'closed', 'ready for stream is not fulfilled until the stream closes'); + t.equal(reader.isActive, false, 'the reader is no longer active after the stream has closed'); + }); + + rs.closed.then(() => { + t.pass('closed for the stream is fulfilled'); + t.equal(rs.state, 'closed', 'the stream state is closed'); + t.equal(reader.state, 'closed', 'the reader state is closed'); + t.equal(reader.isActive, false, 'the reader is no longer active'); + }); +}); + +test('Readers delegate to underlying stream implementations', t => { + t.plan(3 * 3 + 2 * 4); + + var rs = new ReadableStream(); + var reader = rs.getReader(); + + testGetter('ready'); + testGetter('state'); + testGetter('closed'); + testMethod('read'); + testMethod('cancel'); + + // Generates 4 assertions + function testGetter(propertyName) { + Object.defineProperty(rs, propertyName, { + get() { + t.pass('overriden ' + propertyName + ' called'); + t.equal(this, rs, propertyName + ' called with the correct this value'); + return propertyName + ' return value'; + } + }); + t.equal(reader[propertyName], propertyName + ' return value', + `reader's ${propertyName} returns the return value of the stream's ${propertyName}`); + } + + // Generates 5 assertions + function testMethod(methodName) { + var testArgs = ['arg1', 'arg2', 'arg3']; + rs[methodName] = function (...args) { + t.pass('overridden ' + methodName + ' called'); + t.deepEqual(args, testArgs, methodName + ' called with the correct arguments'); + t.equal(this, rs, methodName + ' called with the correct this value'); + return methodName + ' return value'; + } + t.equal(reader[methodName](...testArgs), methodName + ' return value', + `reader's ${methodName} returns the return value of the stream's ${methodName}`); + } +}); + +test('Reading from a reader for an empty stream throws but doesn\'t break anything', t => { + var enqueue; + var rs = new ReadableStream({ + start(e) { + enqueue = e; + } + }); + var reader = rs.getReader(); + + t.equal(reader.isActive, true, 'reader is active to start with'); + t.equal(reader.state, 'waiting', 'reader state is waiting to start with'); + t.throws(() => reader.read(), /TypeError/, 'calling reader.read() throws a TypeError'); + t.equal(reader.isActive, true, 'reader is still active'); + t.equal(reader.state, 'waiting', 'reader state is still waiting'); + + enqueue('a'); + + reader.ready.then(() => { + t.equal(reader.state, 'readable', 'after enqueuing the reader state is readable'); + t.equal(reader.read(), 'a', 'the enqueued chunk can be read back through the reader'); + t.end(); + }); +}); + +test('Trying to use a released reader should work for ready/state/closed but fail for read/cancel', t => { + t.plan(9); + + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + setTimeout(close, 40); + } + }); + var reader = rs.getReader(); + reader.releaseLock(); + + t.equal(reader.isActive, false, 'isActive returns false'); + t.equal(reader.state, 'readable', 'reader.state returns readable'); + t.equal(rs.state, 'readable', 'rs.state returns readable'); + + t.throws(() => reader.read(), /TypeError/, 'trying to read gives a TypeError'); + reader.cancel().then( + () => t.fail('reader.cancel() should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'reader.cancel() should be rejected with a TypeError') + ); + + reader.ready.then(() => { + t.pass('reader.ready should be fulfilled'); + t.equal(rs.read(), 'a', 'reading from the stream should give back the first enqueued chunk'); + t.equal(rs.read(), 'b', 'reading from the stream should give back the second enqueued chunk'); + }); + reader.closed.then(() => t.pass('reader.closed should be fulfilled')); +}); + +test('cancel() on a reader implicitly releases the reader before calling through', t => { + t.plan(3); + + var passedReason = new Error('it wasn\'t the right time, sorry'); + var rs = new ReadableStream({ + cancel(reason) { + t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source'); + } + }); + + var reader = rs.getReader(); + reader.cancel(passedReason).then( + () => t.pass('reader.cancel() should fulfill'), + e => t.fail('reader.cancel() should not reject') + ); + + t.equal(reader.isActive, false, 'canceling via the reader should release the reader\'s lock'); +}); + +test('cancel() on a reader calls this.releaseLock directly instead of cheating', t => { + t.plan(3); + + var rs = new ReadableStream(); + + var reader = rs.getReader(); + reader.releaseLock = function (...args) { + t.pass('releaseLock was called directly'); + t.equal(args.length, 0, 'no arguments were passed'); + t.equal(this, reader, 'the correct this value was passed'); + }; + + reader.cancel(); +}); + +test('getReader() on a closed stream should fail', t => { + var rs = new ReadableStream({ + start(enqueue, close) { + close(); + } + }); + + t.equal(rs.state, 'closed', 'the stream should be closed'); + t.throws(() => rs.getReader(), /TypeError/, 'getReader() threw a TypeError'); + t.end(); +}); + +test('getReader() on a cancelled stream should fail (since cancelling closes)', t => { + var rs = new ReadableStream(); + rs.cancel(new Error('fun time is over')); + + t.equal(rs.state, 'closed', 'the stream should be closed'); + t.throws(() => rs.getReader(), /TypeError/, 'getReader() threw a TypeError'); + t.end(); +}); + +test('getReader() on an errored stream should rethrow the error', t => { + var theError = new Error('don\'t say i didn\'t warn ya'); + var rs = new ReadableStream({ + start(enqueue, close, error) { + error(theError); + } + }); + + t.equal(rs.state, 'errored', 'the stream should be errored'); + t.throws(() => rs.getReader(), /don't say i didn't warn ya/, 'getReader() threw the error'); + t.end(); +}); + +test('closed should be fulfilled after reader releases its lock (both .closed accesses after acquiring)', t => { + t.plan(2); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + var reader = rs.getReader(); + doClose(); + + reader.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + }); + + rs.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + }); +}); + +test('closed should be fulfilled after reader releases its lock (stream .closed access before acquiring)', t => { + t.plan(2); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + rs.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + }); + + var reader = rs.getReader(); + doClose(); + + reader.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + }); +}); + +test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => { + t.plan(6); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + var reader1 = rs.getReader(); + + rs.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled'); + }); + + reader1.releaseLock(); + + var reader2 = rs.getReader(); + doClose(); + + reader1.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled'); + }); + + reader2.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled'); + }); +}); + +test('Multiple readers can access the stream in sequence', t => { + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + enqueue('c'); + enqueue('d'); + enqueue('e'); + close(); + } + }); + + t.equal(rs.read(), 'a', 'reading the first chunk directly from the stream works'); + + var reader1 = rs.getReader(); + t.equal(reader1.read(), 'b', 'reading the second chunk from reader1 works'); + reader1.releaseLock(); + + t.equal(rs.read(), 'c', 'reading the third chunk from the stream after releasing reader1 works'); + + var reader2 = rs.getReader(); + t.equal(reader2.read(), 'd', 'reading the fourth chunk from reader2 works'); + reader2.releaseLock(); + + t.equal(rs.read(), 'e', 'reading the fifth chunk from the stream after releasing reader2 works'); + + t.end(); +}); + +test('A stream that errors has that reflected in the reader and the stream', t => { + t.plan(9); + + var error; + var rs = new ReadableStream({ + start(enqueue, close, error_) { + error = error_; + } + }); + + var reader = rs.getReader(); + + var passedError = new Error('too exclusive'); + error(passedError); + + t.equal(reader.isActive, false, 'the reader should have lost its lock'); + t.throws(() => reader.read(), /TypeError/, + 'reader.read() should throw a TypeError since the reader no longer has a lock'); + t.equal(reader.state, 'errored', 'the reader\'s state should be errored'); + reader.ready.then(() => t.pass('reader.ready should fulfill')); + reader.closed.then( + () => t.fail('reader.closed should not be fulfilled'), + e => t.equal(e, passedError, 'reader.closed should be rejected with the stream error') + ); + + t.throws(() => rs.read(), /too exclusive/, 'rs.read() should throw the stream error'); + t.equal(rs.state, 'errored', 'the stream\'s state should be errored'); + rs.ready.then(() => t.pass('rs.ready should fulfill')); + rs.closed.then( + () => t.fail('rs.closed should not be fulfilled'), + e => t.equal(e, passedError, 'rs.closed should be rejected with the stream error') + ); +}); diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 7436b0ae8..7f81ced6c 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -128,34 +128,30 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro }); test('Piping from a ReadableStream in closed state to a WritableStream in writable state', t => { + t.plan(3); + var rs = new ReadableStream({ start(enqueue, close) { close(); }, pull() { t.fail('Unexpected pull call'); - t.end(); }, cancel(reason) { t.fail('Unexpected cancel call'); - t.end(); } }); t.equal(rs.state, 'closed'); - var closeCalled = false; var ws = new WritableStream({ write() { t.fail('Unexpected write call'); - t.end(); }, close() { - t.assert(!closeCalled); - closeCalled = true; + t.fail('Unexpected close call'); }, abort() { t.fail('Unexpected abort call'); - t.end(); } }); @@ -163,42 +159,39 @@ test('Piping from a ReadableStream in closed state to a WritableStream in writab setTimeout(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws); - t.assert(closeCalled); - t.equal(ws.state, 'closing'); - t.end(); + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'pipeTo promise should be rejected with a TypeError') + ); }, 0); }); test('Piping from a ReadableStream in errored state to a WritableStream in writable state', t => { + t.plan(3); + + var theError = new Error('piping is too hard today'); var rs = new ReadableStream({ start(enqueue, close, error) { - error(); + error(theError); }, pull() { t.fail('Unexpected pull call'); - t.end(); }, cancel(reason) { t.fail('Unexpected cancel call'); - t.end(); } }); t.equal(rs.state, 'errored'); - var abortCalled = false; var ws = new WritableStream({ write() { t.fail('Unexpected write call'); - t.end(); }, close() { t.fail('Unexpected close call'); - t.end(); }, abort() { - t.assert(!abortCalled); - abortCalled = true; + t.fail('Unexpected abort call'); } }); @@ -206,14 +199,10 @@ test('Piping from a ReadableStream in errored state to a WritableStream in writa setTimeout(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws); - - // Need to delay because pipeTo retrieves error from dest using ready. - setTimeout(() => { - t.assert(abortCalled); - t.equal(ws.state, 'errored'); - t.end(); - }, 0); + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not be fulfilled'), + e => t.equal(e, theError, 'pipeTo promise should be rejected with the passed error') + ); }, 0); }); @@ -365,6 +354,8 @@ test('Piping from a ReadableStream in waiting state which becomes readable after test('Piping from a ReadableStream in waiting state which becomes errored after pipeTo call to a WritableStream in ' + 'writable state', t => { + t.plan(4); + var errorReadableStream; var rs = new ReadableStream({ start(enqueue, close, error) { @@ -392,7 +383,6 @@ test('Piping from a ReadableStream in waiting state which becomes errored after }, abort(reason) { t.equal(reason, passedError); - t.end(); } }); @@ -504,7 +494,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait t.equal(ws.state, 'waiting'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); resolveWritePromise(); @@ -562,8 +552,9 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait setTimeout(() => { t.equal(ws.state, 'waiting'); + t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); errorWritableStream(); @@ -573,6 +564,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait test('Piping from a ReadableStream in readable state which becomes errored after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(10); + var errorReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -602,10 +595,9 @@ test('Piping from a ReadableStream in readable state which becomes errored after }, close() { t.fail('Unexpected close call'); - t.end(); }, abort() { - t.end(); + t.pass('underlying source abort was called'); } }); ws.write('Hello'); @@ -615,8 +607,9 @@ test('Piping from a ReadableStream in readable state which becomes errored after t.equal(ws.state, 'waiting'); t.equal(pullCount, 1); + t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); errorReadableStream(); @@ -738,6 +731,8 @@ test('Piping from a ReadableStream in waiting state to a WritableStream in waiti test('Piping from a ReadableStream in waiting state which becomes closed after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(5); + var closeReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -783,20 +778,22 @@ test('Piping from a ReadableStream in waiting state which becomes closed after p rs.pipeTo(ws); closeReadableStream(); + t.equal(rs.state, 'closed'); + // Check that nothing happens. setTimeout(() => { t.equal(ws.state, 'closing'); t.equal(pullCount, 1); - - t.end(); }, 100); }); }); test('Piping from a ReadableStream in waiting state which becomes errored after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(6); + var errorReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -833,7 +830,6 @@ test('Piping from a ReadableStream in waiting state which becomes errored after t.equal(reason, passedError); t.assert(writeCalled); t.equal(pullCount, 1); - t.end(); } }); ws.write('Hello'); @@ -845,6 +841,7 @@ test('Piping from a ReadableStream in waiting state which becomes errored after rs.pipeTo(ws); errorReadableStream(passedError); + t.equal(rs.state, 'errored'); }); }); @@ -1094,7 +1091,7 @@ test('Piping to a stream that errors on the last chunk does not pass through the setTimeout(() => { t.equal(cancelCalled, false, 'cancel must not be called'); - t.equal(ws.state, 'errored', 'the writable stream must still be in an errored state'); + t.equal(ws.state, 'errored'); t.end(); }, 20); });