Skip to content

Commit

Permalink
Allow custom stream implementations to work with the reader
Browse files Browse the repository at this point in the history
Implements option 3 of #251. Now the tests are passing since ReadableByteStream can participate in locking as well.
  • Loading branch information
domenic committed Dec 12, 2014
1 parent 1ad0e39 commit 9c7150e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 64 deletions.
34 changes: 12 additions & 22 deletions Locking Design Doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Example code:
```js
function readAsJson(rs) {
var string = "";
var reader = new ExclusiveStreamReader(rs);
var reader = rs.getReader();

pump();

Expand Down Expand Up @@ -63,21 +63,9 @@ The reader should have all of the non-piping-related public interface of the str
- `read()` method, which has the same behavior as that of the stream's except that it works while the stream is locked
- `cancel()` method, which first calls `this.releaseLock()` before the pass-through

Additionally, it should be possible to check whether a stream is locked. A few candidate syntaxes:
Additionally, it is probably a good idea to be able to tell if a reader is still active/has been released. We propose `reader.isActive`.

```js
stream.isLocked
ExclusiveStreamReader.isLocked(stream)
```

Finally it is probably a good idea to be able to tell if a reader is still active/has been released. One of these two maybe:

```js
reader.isReleased
reader.isActive // inverse
```

(For now we have settled on `ExclusiveStreamReader.isLocked(stream)` and `reader.isActive`.)
It might be useful to, given a stream, tell if it is locked. However, I argue that this is not necessary, and could be viewed as an unnecessary information leak. We could add it back later if there's a compelling use case.

### Level 2: subclassers of `ReadableStream`

Expand All @@ -93,19 +81,21 @@ It is unclear whether this is necessary, but up until now we have a high level o

If we encourage this kind of thing, we should make it easy for custom readable streams to be lockable as well. That basically means `ExclusiveStreamReader` should not require knowledge of `ReadableStream`'s internal slots.

We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getLock, setLock })` or similar.
We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getReader, setReader })` or similar.

This would impact the API by mandating a `rs.getReader()` interface that calls the constructor, instead of a `new ExclusiveStreamReader(stream)` interface.

This would probably impact API, by switching us to a `rs.getReader()` interface that calls the constructor, instead of a `new ExclusiveStreamReader(stream)` interface.
One benefit of doing this is that it makes `ReadableByteStream` less "special." Without a mechanism like this, we'd need to hard-code in support for both `ReadableStream` and `ReadableByteStream`.

## Optimizability

The need to support subclassing, via `ExclusiveStreamReader` delegating to the `ReadableStream` implementation, conflicts a bit with the desire for readers to be fast. However, this can be fixed with some cleverness.
The need to support subclassing, via `ExclusiveStreamReader` delegating to the `ReadableStream` implementation, conflicts a bit with the desire for readers to be fast. Similarly, the need to support custom readable stream implementations means indirecting through the `getReader` and `setReader` functions. However, this can be fixed with some cleverness.

The spec semantics for e.g. `reader.read()` are essentially:

- Check that `reader@[[stream]]` is locked to `reader`.
- Unlock `reader@[[stream]]`.
- Try `return reader@[[stream]].read()`; finally re-lock `reader@[[stream]]`.
- Check that `reader@[[stream]]` is locked to `reader`, using `getReader(reader@[[stream]]) === reader`.
- Unlock `reader@[[stream]]` using `setReader(reader@[[stream]], undefined)`.
- Try `return reader@[[stream]].read()`; finally re-lock `reader@[[stream]]` using `setReader(reader@[[stream]], reader)`.

This will ensure that if `reader@[[stream]]` is a subclass of `ReadableStream`, it will polymorphically dispatch to the subclass's `read` method. However, this kind of try/finally pattern is not very optimizable in V8.

Expand All @@ -115,7 +105,7 @@ Here is an optimization that can be performed instead, with slight tweaks to bot
- Check that `this` is not locked.
- Return `ReadFromReadableStream(this)`. (That is, extract the main functionality, without the check, into its own function.)
- Define `ExclusiveStreamReader.prototype.read` like so:
- Check that `this@[[stream]]` is locked to `this`.
- Check that `this@[[stream]]` is locked to `this`, using `getReader(reader@[[stream]]) === reader` which should be inline-able in the case of the built-in `getReader` instances.
- If `this@[[stream]].read` is equal to the original `ReadableStream.prototype.read`: return `ReadFromReadableStream(this@[[stream]])`.
- Otherwise, proceed via the per-spec semantics above.

Expand Down
67 changes: 26 additions & 41 deletions reference-implementation/lib/exclusive-stream-reader.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,54 @@
var assert = require('assert');

export default class ExclusiveStreamReader {
constructor(stream) {
ensureIsRealStream(stream);
constructor(stream, { getReader, setReader }) {
if (typeof getReader !== 'function') {
throw new TypeError('lock must be a function');
}

if (typeof setReader !== 'function') {
throw new TypeError('unlock must be a function');
}

if (stream._reader !== undefined) {
if (getReader(stream) !== undefined) {
throw new TypeError('This stream has already been locked for exclusive reading by another reader');
}

stream._reader = this;
setReader(stream, this);

this._stream = stream;
this._getReader = getReader;
this._setReader = setReader;

this._lockReleased = new Promise(resolve => {
this._lockReleased_resolve = resolve;
});
}

get ready() {
ensureStreamReaderIsExclusive(this);
EnsureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
this._setReader(this._stream, undefined);
try {
return this._stream.ready;
} finally {
this._stream._reader = this;
this._setReader(this._stream, this);
}
}

get state() {
ensureStreamReaderIsExclusive(this);
EnsureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
this._setReader(this._stream, undefined);
try {
return this._stream.state;
} finally {
this._stream._reader = this;
this._setReader(this._stream, this);
}
}

get closed() {
ensureStreamReaderIsExclusive(this);
EnsureStreamReaderIsExclusive(this);

return this._stream.closed;
}
Expand All @@ -50,56 +58,33 @@ export default class ExclusiveStreamReader {
}

read(...args) {
ensureStreamReaderIsExclusive(this);
EnsureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
this._setReader(this._stream, undefined);
try {
return this._stream.read(...args);
} finally {
this._stream._reader = this;
this._setReader(this._stream, this);
}
}

cancel(reason, ...args) {
ensureStreamReaderIsExclusive(this);
EnsureStreamReaderIsExclusive(this);

var stream = this._stream;
this.releaseLock();
return stream.cancel(reason, ...args);
}

releaseLock() {
if (this._stream === undefined) {
return;
}

this._stream._reader = undefined;
this._setReader(this._stream, undefined);
this._stream = undefined;
this._lockReleased_resolve(undefined);
}

static isLocked(stream) {
ensureIsRealStream(stream);

return stream._reader !== undefined;
}
}

// These do not appear in the spec (thus the lower-case names), since they're one-liners in spec text anyway, but we
// factor them out into helper functions in the reference implementation just for brevity's sake, and to emphasize that
// the error message is the same in all places they're called, and to give us the opportunity to add an assert.

function ensureStreamReaderIsExclusive(reader) {
if (reader._stream === undefined) {
function EnsureStreamReaderIsExclusive(reader) {
if (reader._getReader(reader._stream) !== reader) {
throw new TypeError('This stream reader has released its lock on the stream and can no longer be used');
}

assert(reader._stream._reader === reader,
'If the reader has a [[stream]] then the stream\'s [[reader]] must be this reader');
}

function ensureIsRealStream(stream) {
if (!('_reader' in stream)) {
throw new TypeError('ExclusiveStreamReader can only be used with ReadableStream objects or subclasses');
}
}
17 changes: 17 additions & 0 deletions reference-implementation/lib/experimental/readable-byte-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var assert = require('assert');
import * as helpers from '../helpers';
import ReadableStream from '../readable-stream';
import ExclusiveStreamReader from '../exclusive-stream-reader';

function notifyReady(stream) {
if (stream._state !== 'waiting') {
Expand Down Expand Up @@ -58,6 +59,7 @@ export default class ReadableByteStream {
}

this._state = 'waiting';
this._byteStreamReader = undefined;

this._onReadInto = readInto;
this._onCancel = cancel;
Expand Down Expand Up @@ -169,6 +171,13 @@ export default class ReadableByteStream {
return resizedArrayBuffer;
}

getReader() {
return new ExclusiveStreamReader(this, {
getReader: getReadableByteStreamReader,
setReader: setReadableByteStreamReader
});
}

// Note: We plan to make this more efficient in the future. But for now this
// implementation suffices to show interoperability with a generic
// WritableStream.
Expand Down Expand Up @@ -223,3 +232,11 @@ export default class ReadableByteStream {
this._closedPromise_reject = null;
}
}

function getReadableByteStreamReader(stream) {
return stream._byteStreamReader;
}

function setReadableByteStreamReader(stream, reader) {
stream._byteStreamReader = reader;
}
14 changes: 13 additions & 1 deletion reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export default class ReadableStream {
return sourceCancelPromise.then(() => undefined);
}

getReader() {
return new ExclusiveStreamReader(this, { getReader: getReadableStreamReader, setReader: setReadableStreamReader });
}

pipeThrough({ writable, readable }, options) {
if (!helpers.typeIsObject(writable)) {
throw new TypeError('A transform stream must have an writable property that is an object.');
Expand All @@ -92,7 +96,7 @@ export default class ReadableStream {
}

pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
var source = new ExclusiveStreamReader(this);
var source = this.getReader();
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
Expand Down Expand Up @@ -367,3 +371,11 @@ var defaultReadableStreamStrategy = {
return 1;
}
};

function getReadableStreamReader(stream) {
return stream._reader;
}

function setReadableStreamReader(stream, reader) {
stream._reader = reader;
}

0 comments on commit 9c7150e

Please sign in to comment.