From bd839e5c7527ebf7800842f2d9a3bc7e629f09d1 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Tue, 18 Aug 2015 17:57:54 -0400 Subject: [PATCH] Update IsReadableStreamDisturbed to work after close or error --- index.bs | 5 +- .../lib/readable-stream.js | 14 +++- reference-implementation/test/abstract-ops.js | 80 ++++++++++++++++++- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/index.bs b/index.bs index 551f24a9d..f4aee142b 100644 --- a/index.bs +++ b/index.bs @@ -812,6 +812,7 @@ Instances of ReadableStreamReader are created with the internal slo 1. If IsReadableStreamReader(*this*) is *false*, return a promise rejected with a *TypeError* exception. + 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]]. 1. Assert: *this*@[[ownerReadableStream]] is not *undefined*. @@ -878,9 +879,9 @@ reader for a given stream.

CancelReadableStream ( stream, reason )

+ 1. Set _stream_@[[disturbed]] to *true*. 1. If _stream_@[[state]] is "closed", return a new promise resolved with *undefined*. 1. If _stream_@[[state]] is "errored", return a new promise rejected with _stream_@[[storedError]]. - 1. Set _stream_@[[disturbed]] to *true*. 1. Set _stream_@[[queue]] to a new empty List. 1. Perform FinishClosingReadableStream(_stream_). 1. Let _sourceCancelPromise_ be PromiseInvokeOrNoop(_stream_@[[underlyingSource]], "cancel", «‍_reason_»). @@ -1048,11 +1049,11 @@ readable stream is locked to a reader.

ReadFromReadableStreamReader ( reader )

+ 1. If *this*@[[ownerReadableStream]] is not *undefined*, set *this*@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If _reader_@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(*undefined*, *true*). 1. If _reader_@[[state]] is "errored", return a new promise rejected with _reader_@[[storedError]]. 1. Assert: _reader_@[[ownerReadableStream]] is not *undefined*. 1. Assert: _reader_@[[ownerReadableStream]]@[[state]] is "readable". - 1. Set _reader_@[[ownerReadableStream]]@[[disturbed]] to *true*. 1. If _reader_@[[ownerReadableStream]]@[[queue]] is not empty, 1. Let _chunk_ be DequeueValue(_reader_@[[ownerReadableStream]]@[[queue]]). 1. If _reader_@[[ownerReadableStream]]@[[closeRequested]] is *true* and _reader_@[[ownerReadableStream]]@[[queue]] is now empty, perform FinishClosingReadableStream(_reader_@[[ownerReadableStream]]). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index df32b70da..7b6186676 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -299,6 +299,10 @@ class ReadableStreamReader { new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); } + if (this._ownerReadableStream !== undefined) { + this._ownerReadableStream._disturbed = true; + } + if (this._state === 'closed') { return Promise.resolve(undefined); } @@ -352,6 +356,8 @@ function AcquireReadableStreamReader(stream) { } function CancelReadableStream(stream, reason) { + stream._disturbed = true; + if (stream._state === 'closed') { return Promise.resolve(undefined); } @@ -359,8 +365,6 @@ function CancelReadableStream(stream, reason) { return Promise.reject(stream._storedError); } - stream._disturbed = true; - stream._queue = []; FinishClosingReadableStream(stream); @@ -538,6 +542,10 @@ function IsReadableStreamReader(x) { } function ReadFromReadableStreamReader(reader) { + if (reader._ownerReadableStream !== undefined) { + reader._ownerReadableStream._disturbed = true; + } + if (reader._state === 'closed') { return Promise.resolve(CreateIterResultObject(undefined, true)); } @@ -549,8 +557,6 @@ function ReadFromReadableStreamReader(reader) { assert(reader._ownerReadableStream !== undefined); assert(reader._ownerReadableStream._state === 'readable'); - reader._ownerReadableStream._disturbed = true; - if (reader._ownerReadableStream._queue.length > 0) { const chunk = DequeueValue(reader._ownerReadableStream._queue); diff --git a/reference-implementation/test/abstract-ops.js b/reference-implementation/test/abstract-ops.js index 3070a3d20..d3d1504f3 100644 --- a/reference-implementation/test/abstract-ops.js +++ b/reference-implementation/test/abstract-ops.js @@ -2,7 +2,7 @@ const test = require('tape-catch'); import { IsReadableStreamDisturbed } from '../lib/readable-stream' -test('IsReadableStreamDisturbed returns true for a stream on which read() has been called', t => { +test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which read() has been called', t => { const rs = new ReadableStream(); t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); @@ -17,7 +17,7 @@ test('IsReadableStreamDisturbed returns true for a stream on which read() has be t.end(); }); -test('IsReadableStreamDisturbed returns true for a stream on which cancel() has been called', t => { +test('IsReadableStreamDisturbed returns true for an empty non-closed stream on which cancel() has been called', t => { const rs = new ReadableStream(); t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); @@ -31,3 +31,79 @@ test('IsReadableStreamDisturbed returns true for a stream on which cancel() has t.end(); }); + +test('IsReadableStreamDisturbed returns true for a closed stream on which read() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.close(); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.read(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for a closed stream on which cancel() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.close(); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.cancel(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for an errored stream on which read() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.error(new Error('waffles')); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.read(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after read() call'); + + t.end(); +}); + +test('IsReadableStreamDisturbed returns true for an errored stream on which cancel() has been called', t => { + const rs = new ReadableStream({ + start(c) { + c.error(new Error('waffles')); + } + }); + + t.equal(IsReadableStreamDisturbed(rs), false, 'rs should not be disturbed on construction'); + + const reader = rs.getReader(); + t.equal(IsReadableStreamDisturbed(rs), false, + 'getReader() call has no effect on whether a stream is disturbed or not'); + + reader.cancel(); + t.equal(IsReadableStreamDisturbed(rs), true, 'rs should be disturbed after cancel() call'); + + t.end(); +});