From bb47488030e1be545bfc40450a6b4a8186a8ab63 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Fri, 13 Mar 2015 17:10:13 +0900 Subject: [PATCH] Tweak semantics of readers, locks, and cancellation See discussion in #297. This commit implements the following changes: - Allow acquiring readers for closed or errored streams; they simply act closed or errored. - Stop auto-releasing readers when streams close/error. - Disallow canceling a stream that is locked to a reader (you should use the reader cancel). - Piping from a closed or errored stream will close or abort the destination stream, instead of immediately failing the pipe. --- index.bs | 61 ++++++++++--------- .../lib/readable-stream.js | 51 +++++++++------- .../test/readable-stream-cancel.js | 29 ++++++++- .../test/readable-stream-reader.js | 31 +++++----- .../test/readable-stream-templated.js | 2 +- .../test/readable-stream.js | 7 ++- .../test/templated/readable-stream-closed.js | 21 +++---- .../templated/readable-stream-empty-reader.js | 12 ++-- .../readable-stream-errored-sync-only.js | 11 ---- .../test/templated/readable-stream-errored.js | 20 ++++-- .../test/utils/readable-stream-to-array.js | 3 +- 11 files changed, 141 insertions(+), 107 deletions(-) diff --git a/index.bs b/index.bs index b875dc0a1..d9db80921 100644 --- a/index.bs +++ b/index.bs @@ -173,14 +173,14 @@ A queuing strategy is generally associated with a specific type of underlying A readable stream reader or simply reader is an object that allows direct reading of chunks from a readable stream. Without a reader, a consumer can only perform high-level operations on the readable stream: waiting for the stream to become closed or errored, canceling the stream, -or piping the readable stream to a writable stream. +or piping the readable stream to a writable stream. Many of those high-level operations actually use a reader +themselves. A given readable stream only has at most one reader at a time. We say in this case the stream is locked to the reader, and that the reader is active. A reader also has the capability to release its read lock, which makes it no -longer active. At this point another reader can be acquired at will. If the stream becomes closed or errored as a -result of the behavior of its underlying source, its reader (if one exists) will automatically release its lock. +longer active. At this point another reader can be acquired at will.

Readable Streams

@@ -399,6 +399,7 @@ Instances of ReadableStream are created with the internal slots des
  1. If IsReadableStream(this) is false, return a promise rejected with a TypeError exception. +
  2. If IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
  3. Return CancelReadableStream(this, reason).
@@ -407,13 +408,12 @@ Instances of ReadableStream are created with the internal slots des
The getReader method creates an readable stream reader and locks the stream to the the new reader. While the stream is locked, no other reader - can be acquired until this one is released. + can be acquired until this one is released. The returned reader provides the ability + to directly read individual chunks from the stream via the reader's read method. - The returned reader provides the ability to directly read individual chunks from the stream via the reader's - read method. This design ensures that if you control the reader, nobody else can interleave reads with - yours, interfering with your code or observing its side-effects. - - Note that when a stream is closed or errors, any reader it is locked to is automatically released. + This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its + entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel + the stream, which would interfere with your abstraction.
    @@ -445,7 +445,8 @@ Instances of ReadableStream are created with the internal slots des } - Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. + Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures + that no other consumer can interfere with the stream, either by reading chunks (causing )
    pipeThrough({ writable, readable }, options)
    @@ -563,13 +564,15 @@ Instances of ReadableStreamReader are created with the internal slo
    1. If IsReadableStream(stream) is false, throw a TypeError exception. -
    2. If stream@\[[state]] is "closed", throw a TypeError exception. -
    3. If stream@\[[state]] is "errored", throw stream@\[[storedError]].
    4. If IsReadableStreamLocked(stream) is true, throw a TypeError exception.
    5. Set stream@\[[reader]] to this.
    6. Set this@\[[ownerReadableStream]] to stream.
    7. Set this@\[[readRequests]] to a new empty List.
    8. Set this@\[[closedPromise]] to a new promise. +
    9. If stream@\[[state]] is "closed", resolve this@\[[closedPromise]] with + undefined. +
    10. If stream@\[[state]] is "errored", reject this@\[[closedPromise]] with + stream@\[[storedError]].

    Properties of the ReadableStreamReader Prototype

    @@ -604,7 +607,7 @@ Instances of ReadableStreamReader are created with the internal slo
    If the reader is active, the cancel method behaves the same as that for the - associated stream. When done, it automatically releases the lock. + associated stream.
      @@ -674,7 +677,9 @@ Instances of ReadableStreamReader are created with the internal slo
    1. If IsReadableStreamReader(this) is false, throw a TypeError exception.
    2. If this@\[[ownerReadableStream]] is undefined, return undefined.
    3. If this@\[[readRequests]] is not empty, throw a TypeError exception. -
    4. Return ReleaseReadableStreamReader(this). +
    5. Call-with-rethrow CloseReadableStreamReader(this). +
    6. Set this@\[[ownerReadableStream]]@\[[reader]] to undefined. +
    7. Set this@\[[ownerReadableStream]] to undefined.

    Readable Stream Abstract Operations

    @@ -739,7 +744,19 @@ Instances of ReadableStreamReader are created with the internal slo
  1. Resolve stream@\[[closedPromise]] with undefined.
  2. Set stream@\[[state]] to "closed".
  3. If IsReadableStreamLocked(stream) is true, return - ReleaseReadableStreamReader(stream). + CloseReadableStreamReader(stream). +
  4. Return undefined. +
+ +

CloseReadableStreamReader ( reader )

+ +
    +
  1. Repeat for each readRequestPromise that is an element of reader@\[[readRequests]], +
      +
    1. Resolve readRequestPromise with CreateIterResultObject(undefined, true). +
    +
  2. Set reader@\[[readRequests]] to a new empty List. +
  3. Resolve reader@\[[closedPromise]] with undefined.
  4. Return undefined.
@@ -872,20 +889,6 @@ a variable stream, that performs the following steps:
  • Return true. -

    ReleaseReadableStreamReader ( reader )

    - -
      -
    1. Assert: reader@\[[ownerReadableStream]] is not undefined. -
    2. Repeat for each readRequestPromise that is an element of reader@\[[readRequests]], -
        -
      1. Resolve readRequestPromise with CreateIterResultObject(undefined, true). -
      -
    3. Set reader@\[[readRequests]] to a new empty List. -
    4. Set reader@\[[ownerReadableStream]]@\[[reader]] to undefined. -
    5. Set reader@\[[ownerReadableStream]] to undefined. -
    6. Resolve reader@\[[closedPromise]] with undefined. -
    -

    ShouldReadableStreamApplyBackpressure ( stream )

      diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 427adf76e..64e740bd4 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -42,6 +42,10 @@ export default class ReadableStream { return Promise.reject(new TypeError('ReadableStream.prototype.cancel can only be used on a ReadableStream')); } + if (IsReadableStreamLocked(this) === true) { + return Promise.reject(new TypeError('Cannot cancel a stream that already has a reader')); + } + return CancelReadableStream(this, reason); } @@ -107,8 +111,8 @@ export default class ReadableStream { function cancelSource(reason) { if (preventCancel === false) { - // cancelling automatically releases the lock (and that doesn't fail, since source is then closed) - source.cancel(reason); + reader.cancel(reason); + reader.releaseLock(); rejectPipeToPromise(reason); } else { // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. @@ -177,12 +181,6 @@ class ReadableStreamReader { if (IsReadableStream(stream) === false) { throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance'); } - if (stream._state === 'closed') { - throw new TypeError('The stream has already been closed, so a reader cannot be acquired'); - } - if (stream._state === 'errored') { - throw stream._storedError; - } if (IsReadableStreamLocked(stream) === true) { throw new TypeError('This stream has already been locked for exclusive reading by another reader'); } @@ -196,6 +194,14 @@ class ReadableStreamReader { this._closedPromise_resolve = resolve; this._closedPromise_reject = reject; }); + + if (stream._state === 'closed') { + this._closedPromise_resolve(undefined); + } + + if (stream._state === 'errored') { + this._closedPromise_reject(stream._storedError); + } } get closed() { @@ -277,7 +283,10 @@ class ReadableStreamReader { throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); } - ReleaseReadableStreamReader(this); + CloseReadableStreamReader(this); + + this._ownerReadableStream._reader = undefined; + this._ownerReadableStream = undefined; } } @@ -337,12 +346,21 @@ function CloseReadableStream(stream) { stream._state = 'closed'; if (IsReadableStreamLocked(stream) === true) { - return ReleaseReadableStreamReader(stream._reader); + return CloseReadableStreamReader(stream._reader); } return undefined; } +function CloseReadableStreamReader(reader) { + for (const { _resolve } of reader._readRequests) { + _resolve(CreateIterResultObject(undefined, true)); + } + reader._readRequests = []; + + reader._closedPromise_resolve(undefined); +} + function CreateReadableStreamCloseFunction(stream) { return () => { if (stream._state !== 'readable') { @@ -468,19 +486,6 @@ function IsReadableStreamReader(x) { return true; } -function ReleaseReadableStreamReader(reader) { - assert(reader._ownerReadableStream !== undefined); - - for (const { _resolve } of reader._readRequests) { - _resolve(CreateIterResultObject(undefined, true)); - } - reader._readRequests = []; - - reader._ownerReadableStream._reader = undefined; - reader._ownerReadableStream = undefined; - reader._closedPromise_resolve(undefined); -} - function ShouldReadableStreamApplyBackpressure(stream) { const queueSize = GetTotalQueueSize(stream._queue); let shouldApplyBackpressure = queueSize > 1; diff --git a/reference-implementation/test/readable-stream-cancel.js b/reference-implementation/test/readable-stream-cancel.js index 6d192de56..a5e75624c 100644 --- a/reference-implementation/test/readable-stream-cancel.js +++ b/reference-implementation/test/readable-stream-cancel.js @@ -29,8 +29,9 @@ test('ReadableStream cancellation: integration test on an infinite stream derive }, 50)); } }); + const reader = rs.getReader(); - readableStreamToArray(rs).then( + readableStreamToArray(rs, reader).then( chunks => { t.equal(cancellationFinished, false, 'it did not wait for the cancellation process to finish before closing'); t.ok(chunks.length > 0, 'at least one chunk should be read'); @@ -42,7 +43,7 @@ test('ReadableStream cancellation: integration test on an infinite stream derive ); setTimeout(() => { - rs.cancel().then(() => { + reader.cancel().then(() => { t.equal(cancellationFinished, true, 'it returns a promise that is fulfilled when the cancellation finishes'); t.end(); }) @@ -66,6 +67,30 @@ test('ReadableStream cancellation: cancel(reason) should pass through the given t.end(); }); +test('ReadableStream cancellation: cancel() on a locked stream should fail and not call the underlying source cancel', + t => { + t.plan(3); + const rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + close(); + }, + cancel() { + t.fail('underlying source cancel() should not have been called'); + } + }); + + const reader = rs.getReader(); + + rs.cancel().catch(e => t.equal(e.constructor, TypeError, 'cancel() should be rejected with a TypeError')); + + reader.read().then( + result => t.deepEqual(result, { value: 'a', done: false }, 'read() should still work after the attempted cancel') + ); + + rs.closed.then(() => t.pass('closed should fulfill without underlying source cancel ever being called')); +}); + test('ReadableStream cancellation: returning a value from the underlying source\'s cancel should not affect the ' + 'fulfillment value of the promise returned by the stream\'s cancel', t => { t.plan(1); diff --git a/reference-implementation/test/readable-stream-reader.js b/reference-implementation/test/readable-stream-reader.js index c886d36fc..7d1abae41 100644 --- a/reference-implementation/test/readable-stream-reader.js +++ b/reference-implementation/test/readable-stream-reader.js @@ -34,7 +34,7 @@ test('Constructing an ReadableStreamReader directly should fail if the stream is t.end(); }); -test('Constructing an ReadableStreamReader directly should fail if the stream is already closed', +test('Constructing an ReadableStreamReader directly should be OK if the stream is closed', t => { const rs = new ReadableStream({ start(enqueue, close) { @@ -42,11 +42,11 @@ test('Constructing an ReadableStreamReader directly should fail if the stream is } }); - t.throws(() => new ReadableStreamReader(rs), /TypeError/, 'constructing directly should fail'); + t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly should not throw'); t.end(); }); -test('Constructing an ReadableStreamReader directly should fail if the stream is already errored', +test('Constructing an ReadableStreamReader directly should be OK if the stream is errored', t => { const theError = new Error('don\'t say i didn\'t warn ya'); const rs = new ReadableStream({ @@ -55,7 +55,7 @@ test('Constructing an ReadableStreamReader directly should fail if the stream is } }); - t.throws(() => new ReadableStreamReader(rs), /don't say i didn't warn ya/, 'getReader() threw the error'); + t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly should not throw'); t.end(); }); @@ -79,20 +79,23 @@ test('Reading from a reader for an empty stream will wait until a chunk is avail enqueue('a'); }); -test('cancel() on a reader releases the reader before calling through', t => { - t.plan(3); +test('cancel() on a reader should leave the reader active', t => { + t.plan(4); const passedReason = new Error('it wasn\'t the right time, sorry'); const rs = new ReadableStream({ cancel(reason) { - t.equal(reader.isActive, false, 'reader should be released by the time underlying source cancel is called'); - t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source'); + t.equal(reader.isActive, true, 'reader should still be active when underlying source cancel is called'); + t.equal(reason, passedReason, 'the cancellation reason should be passed through to the underlying source'); } }); const reader = rs.getReader(); reader.cancel(passedReason).then( - () => t.pass('reader.cancel() should fulfill'), + () => { + t.pass('reader.cancel() should fulfill'); + t.equal(reader.isActive, true, 'the reader should still be active after cancel() fulfills'); + }, e => t.fail('reader.cancel() should not reject') ); }); @@ -108,14 +111,14 @@ test('closed should be fulfilled after stream is closed (stream .closed access b }); rs.closed.then(() => { - t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + t.equal(reader.isActive, true, 'reader is still active when stream closed is fulfilled'); }); const reader = rs.getReader(); doClose(); reader.closed.then(() => { - t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + t.equal(reader.isActive, true, 'reader is still active when reader closed is fulfilled'); }); }); @@ -133,7 +136,7 @@ test('closed should be fulfilled after reader releases its lock (multiple stream rs.closed.then(() => { t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled'); - t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled'); + t.equal(reader2.isActive, true, 'reader2 is still active when stream closed is fulfilled'); }); reader1.releaseLock(); @@ -143,12 +146,12 @@ test('closed should be fulfilled after reader releases its lock (multiple stream reader1.closed.then(() => { t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled'); - t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled'); + t.equal(reader2.isActive, true, 'reader2 is active when reader1 closed is fulfilled'); }); reader2.closed.then(() => { t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled'); - t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled'); + t.equal(reader2.isActive, true, 'reader2 is still active when reader2 closed is fulfilled'); }); }); diff --git a/reference-implementation/test/readable-stream-templated.js b/reference-implementation/test/readable-stream-templated.js index 192f2d113..1b4c42c0d 100644 --- a/reference-implementation/test/readable-stream-templated.js +++ b/reference-implementation/test/readable-stream-templated.js @@ -50,7 +50,7 @@ templatedRSClosedReader('ReadableStream (closed via cancel) reader', () => { const stream = new ReadableStream(); const result = streamAndDefaultReader(stream); - stream.cancel(); + result.reader.cancel(); return result; } ); diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index a883ea609..3516d1aae 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -496,7 +496,12 @@ test('ReadableStream: should call underlying source methods as methods', t => { theSource.debugName = 'the source object passed to the constructor'; // makes test failures easier to diagnose const rs = new ReadableStream(theSource); - rs.getReader().read().then(() => rs.cancel()); + const reader = rs.getReader(); + reader.read().then(() => { + reader.releaseLock(); + rs.cancel(); + }) + .catch(e => t.error(e)); }); test('ReadableStream strategies: the default strategy should return false for all but the first enqueue call', t => { diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js index 8a250c836..134b30613 100644 --- a/reference-implementation/test/templated/readable-stream-closed.js +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -30,15 +30,15 @@ export default (label, factory) => { t.notEqual(cancelPromise2, closedPromise, 'cancel() promise 2 should be distinct from closed'); }); - test('getReader() should throw a TypeError', t => { + test('getReader() should be OK', t => { t.plan(1); const rs = factory(); - t.throws(() => rs.getReader(), /TypeError/, 'getReader() should fail'); + t.doesNotThrow(() => rs.getReader(), 'getReader() should not throw'); }); - test('piping to a WritableStream in the writable state should fail', t => { - t.plan(3); + test('piping to a WritableStream in the writable state should close the writable stream', t => { + t.plan(4); const rs = factory(); const startPromise = Promise.resolve(); @@ -50,7 +50,7 @@ export default (label, factory) => { t.fail('Unexpected write call'); }, close() { - t.fail('Unexpected close call'); + t.pass('underlying source close should be called'); }, abort() { t.fail('Unexpected abort call'); @@ -60,13 +60,10 @@ export default (label, factory) => { startPromise.then(() => { t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - rs.pipeTo(ws).then( - () => t.fail('pipeTo promise should not fulfill'), - e => { - t.equal(e.constructor, TypeError, 'pipeTo promise should be rejected with a TypeError'); - t.equal(ws.state, 'writable', 'writable stream should still be writable'); - } - ); + rs.pipeTo(ws).then(() => { + t.pass('pipeTo promise should be fulfilled'); + t.equal(ws.state, 'closed', 'writable stream should become closed'); + }); }); }); }; diff --git a/reference-implementation/test/templated/readable-stream-empty-reader.js b/reference-implementation/test/templated/readable-stream-empty-reader.js index ead5c6eca..0218d40e6 100644 --- a/reference-implementation/test/templated/readable-stream-empty-reader.js +++ b/reference-implementation/test/templated/readable-stream-empty-reader.js @@ -110,25 +110,23 @@ export default (label, factory) => { stream.closed.then(() => t.fail('stream.closed got after release should not fulfill')); }); - test('canceling via the reader should cause the reader to become inactive', t => { + test('canceling via the reader should leave the reader active', t => { t.plan(3); const { reader } = factory(); t.equal(reader.isActive, true, 'the reader should be active before releasing it'); reader.cancel(); - t.equal(reader.isActive, false, 'the reader should no longer be active'); + t.equal(reader.isActive, true, 'the reader should still be active'); reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, 'read()ing from the reader should give a done result')) }); - test('canceling via the stream should cause the reader to become inactive', t => { + test('canceling via the stream should fail', t => { t.plan(3); const { stream, reader } = factory(); t.equal(reader.isActive, true, 'the reader should be active before releasing it'); - stream.cancel(); - t.equal(reader.isActive, false, 'the reader should no longer be active'); - reader.read().then(r => t.deepEqual(r, { value: undefined, done: true }, - 'read()ing from the reader should give a done result')) + stream.cancel().catch(e => t.equal(e.constructor, TypeError, 'cancel() should reject with a TypeError')); + t.equal(reader.isActive, true, 'the reader should still be active'); }); }; diff --git a/reference-implementation/test/templated/readable-stream-errored-sync-only.js b/reference-implementation/test/templated/readable-stream-errored-sync-only.js index 7e01d1efe..45579c9a8 100644 --- a/reference-implementation/test/templated/readable-stream-errored-sync-only.js +++ b/reference-implementation/test/templated/readable-stream-errored-sync-only.js @@ -19,15 +19,4 @@ export default (label, factory, error) => { t.notEqual(cancelPromise1, closedPromise, 'cancel() promise 1 should be distinct from closed'); t.notEqual(cancelPromise2, closedPromise, 'cancel() promise 2 should be distinct from closed'); }); - - test('getReader() should throw the error', t => { - t.plan(1); - const rs = factory(); - - try { - rs.getReader(); - } catch (e) { - t.equal(e, error, 'getReader() should throw the error'); - } - }); }; diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js index 67c9e3d75..2866f9a3b 100644 --- a/reference-implementation/test/templated/readable-stream-errored.js +++ b/reference-implementation/test/templated/readable-stream-errored.js @@ -15,8 +15,8 @@ export default (label, factory, error) => { ); }); - test('piping to a WritableStream in the writable state should fail', t => { - t.plan(3); + test('piping to a WritableStream in the writable state should abort the writable stream', t => { + t.plan(4); const rs = factory(); @@ -31,8 +31,8 @@ export default (label, factory, error) => { close() { t.fail('Unexpected close call'); }, - abort() { - t.fail('Unexpected abort call'); + abort(reason) { + t.equal(reason, error); } }); @@ -43,9 +43,19 @@ export default (label, factory, error) => { () => t.fail('pipeTo promise should not be fulfilled'), e => { t.equal(e, error, 'pipeTo promise should be rejected with the passed error'); - t.equal(ws.state, 'writable', 'writable stream should still be writable'); + t.equal(ws.state, 'errored', 'writable stream should become errored'); } ); }); }); + + test('getReader() should return a reader that acts errored', t => { + t.plan(2); + const rs = factory(); + + const reader = rs.getReader(); + + reader.closed.catch(e => t.equal(e, error, 'reader.closed should reject with the error')); + reader.read().catch(e => t.equal(e, error, 'reader.read() should reject with the error')); + }); }; diff --git a/reference-implementation/test/utils/readable-stream-to-array.js b/reference-implementation/test/utils/readable-stream-to-array.js index 7c8b3d3db..57d32040c 100644 --- a/reference-implementation/test/utils/readable-stream-to-array.js +++ b/reference-implementation/test/utils/readable-stream-to-array.js @@ -1,6 +1,5 @@ -export default function readableStreamToArray(readable) { +export default function readableStreamToArray(readable, reader = readable.getReader()) { const chunks = []; - const reader = readable.getReader(); return pump();