Skip to content

Commit

Permalink
Add ability to exclusively lock a readable stream
Browse files Browse the repository at this point in the history
  • Loading branch information
domenic committed Dec 23, 2014
1 parent 4567064 commit d0b6d73
Show file tree
Hide file tree
Showing 7 changed files with 806 additions and 73 deletions.
108 changes: 108 additions & 0 deletions Locking Design Doc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Locking a Stream for Exclusive Reading

In [#241](https://github.com/whatwg/streams/issues/241) we had a great conversation about the need for being able to "lock" a stream for exclusive use. This would be done implicitly while piping, but could also be useful for building user-facing abstractions, as we'll see below.

What emerged was the idea of a "stream reader," which has most of the readable stream interface, but while it exists you cannot read from the stream except through that reader.

This document represents some formative rationales for the design of the reader concept, approached from the perspective of a developer that uses increasingly complex features of the streams ecosystem.

## Developer usage

### Level 0: no reader usage

If the developer knows nothing about readers, they can continue using the stream just fine.

- `read()`, `state`, and `ready` all behave as they do now if used without `pipeTo`.
- `pipeTo` will cause the following side effects:
- `read()` will throw an informative error
- `state` will return `"waiting"` until the pipe completes (successfully or otherwise)
- `ready` will return a promise that remains pending until the pipe completes

### Level 1: using readers directly

The developer might want to create their own abstractions that require exclusive access to the stream. For example, a read-to-end function would probably want to avoid others being able to call `.read()` in the middle.

Example code:

```js
function readAsJson(rs) {
var string = "";
var reader = rs.getReader();

pump();

// These lines would be simpler with `Promise.prototype.finally` (or async functions).
return reader.closed.then(
() => {
reader.releaseLock();
return JSON.parse(string);
},
e => {
reader.releaseLock();
throw e;
}
);

function pump() {
while (reader.state === "readable") {
string += reader.read();
}
if (reader.state === "waiting") {
reader.ready.then(pump);
}
}
}
```

The stream would have the same behaviors after being passed to `readAsJson` that it would have after calling its `pipeTo` method.

The reader should have all of the non-piping-related public interface of the stream. This includes:

- `closed` getter, which is a pass-through
- `state` and `ready` getters, which reveal the "true" state and state transitions of the stream which the stream itself no longer reveals
- `read()` method, which has the same behavior as that of the stream's except that it works while the stream is locked
- `cancel()` method, which first calls `this.releaseLock()` before the pass-through

While a stream is locked, it is indistinguishable from a stream that has been drained of all chunks and is not getting any more enqueued. We could consider adding some kind of test, like `stream.isLocked`, to distinguish. However, it's not clear there's a compelling reason for doing so (let us know if so?), and the indistinguishability is kind of a nice property from the perspective of the principle of least authority.

For readers, you should be able to tell if they're still active (i.e. have not been released) via `reader.isActive`.

### Level 2: subclassers of `ReadableStream`

Subclasses of `ReadableStream` should get locking support "for free." The same mechanisms for acquiring and using a lock should work flawlessly. More interestingly, if they wanted to support modifying the behavior of e.g. `read()` (or `state` or `ready` or `closed`), they should only have to override it in one location.

Which location is more friendly? Probably in `ReadableStream`, so that `ExclusiveStreamReader` still works for `ReadableStream` subclasses. Less work.

This means `ExclusiveStreamReader` should delegate to `ReadableStream`, and not the other way around.

### Level 3: custom readable stream implementations?

It is unclear whether this is necessary, but up until now we have a high level of support for anyone who wants to re-implement the entire `ReadableStream` interface with their own specific code. For example, if you implement `state`, `ready`, `closed`, `read()`, and `cancel()`, you can do `myCustomStream.pipeTo = ReadableStream.prototype.pipeTo` and it will continue to work.

If we encourage this kind of thing, we should make it easy for custom readable streams to be lockable as well. That basically means `ExclusiveStreamReader` should not require knowledge of `ReadableStream`'s internal slots.

We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getLock, setLock })` or similar.

## Optimizability

The need to support subclassing, via `ExclusiveStreamReader` delegating to the `ReadableStream` implementation, conflicts a bit with the desire for readers to be fast. However, this can be fixed with some cleverness.

The spec semantics for e.g. `reader.read()` are essentially:

- Check that `reader@[[stream]]` is locked to `reader`.
- Unlock `reader@[[stream]]`.
- Try `return reader@[[stream]].read()`; finally re-lock `reader@[[stream]]`.

This will ensure that if `reader@[[stream]]` is a subclass of `ReadableStream`, it will polymorphically dispatch to the subclass's `read` method. However, this kind of try/finally pattern is not very optimizable in V8.

Here is an optimization that can be performed instead, with slight tweaks to both `ReadableStream.prototype.read` and `ExclusiveStreamReader.prototype.read`:

- Define `ReadableStream.prototype.read` as:
- Check that `this` is not locked.
- Return `ReadFromReadableStream(this)`. (That is, extract the main functionality, without the check, into its own function.)
- Define `ExclusiveStreamReader.prototype.read` like so:
- Check that `this@[[stream]]` is locked to `this`.
- If `this@[[stream]].read` is equal to the original `ReadableStream.prototype.read`: return `ReadFromReadableStream(this@[[stream]])`.
- Otherwise, proceed via the per-spec semantics above.

This essentially ensures that all undisturbed readable streams, or readable stream subclasses that do not override `read`, go down the "fast path" by ignoring all the try/finally and lock/unlock business. It is unobservable, since we have checked that `read` has not been modified in any way.
157 changes: 139 additions & 18 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ based the total size of all chunks in the stream's internal queue.
backpressure signal.
</div>

<h3 id="locking">Locking</h3>

<!-- TODO: writable streams too, probably -->

An <dfn>exclusive stream reader</dfn> or simply reader is an object that encapsulates a <a>readable stream</a>,
preventing access to the stream except through the reader's interface. We say in this case the stream is
<dfn title="locked to a reader">locked to the reader</dfn>, and that the reader is
<dfn title="active reader">active</dfn>.

The reader presents most of the stream's interface, but while it is active, only the reader's methods and properties
can be used to successfully manipulate and interrogate the state of the stream; when the stream is used directly, it
appears as if it is empty.

A reader also has the capability to <dfn title="release a read lock">release its read lock</dfn>, which makes it no
longer active. At this point the original stream can be used as before, and the reader becomes inert.

<h2 id="rs">Readable Streams</h2>

<h3 id="rs-intro">Introduction to Readable Streams</h3>
Expand Down Expand Up @@ -376,6 +392,7 @@ would look like
get state()

cancel(reason)
getReader()
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
read()
Expand Down Expand Up @@ -433,6 +450,10 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<td>\[[queue]]
<td>A List representing the stream's internal queue of <a>chunks</a>
</tr>
<tr>
<td>\[[reader]]
<td>A <code>ExclusiveStreamReader</code> instance, if the stream is locked to an exclusive reader, or
<b>undefined</b> if it is not
<tr>
<td>\[[started]]
<td>A boolean flag indicating whether the <a>underlying source</a> has finished starting
Expand Down Expand Up @@ -493,6 +514,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<li> Set <b>this</b>@\[[queue]] to a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
<li> Set <b>this</b>@\[[started]], <b>this</b>@\[[draining]], and <b>this</b>@\[[pulling]] to <b>false</b>.
<li> Set <b>this</b>@\[[reader]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[enqueue]] to CreateReadableStreamEnqueueFunction(<b>this</b>).
<li> Set <b>this</b>@\[[close]] to CreateReadableStreamCloseFunction(<b>this</b>).
<li> Set <b>this</b>@\[[error]] to CreateReadableStreamErrorFunction(<b>this</b>).
Expand Down Expand Up @@ -532,6 +554,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
</div>

<ol>
<li> If <b>this</b>@\[[reader]] is not <b>undefined</b>, return <b>this</b>@\[[reader]]@\[[lockReleased]].
<li> Return <b>this</b>@\[[readyPromise]].
</ol>

Expand All @@ -553,9 +576,12 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<dt><code>"errored"</code>
<dd>An error occurred interacting with the <a>underlying source</a>, and so the stream is now dead.
</dl>

If the stream is <a>locked to a reader</a>, the stream will appear to be <code>"waiting"</code>.
</div>

<ol>
<li> If <b>this</b>@\[[reader]] is not <b>undefined</b>, return <code>"waiting"</code>.
<li> Return <b>this</b>@\[[state]].
</ol>

Expand All @@ -565,19 +591,44 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
The <code>cancel</code> method signals a loss of interest in the stream by a consumer. Calling it will immediately
move the stream to a <code>"closed"</code> state, throwing away any queued data, as well as executing any
cancellation mechanism of the <a>underlying source</a>.

Readable streams cannot be cancelled while <a>locked to a reader</a>; this method will return a rejected promise.
</div>

<ol>
<li> If <b>this</b>@\[[reader]] is not <b>undefined</b>, return a new promise rejected with a <b>TypeError</b>.
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, return a new promise resolved with <b>undefined</b>.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a new promise rejected with <b>this</b>@\[[storedError]].
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code>, resolve <b>this</b>@\[[readyPromise]] with <b>undefined</b>.
<li> Let <b>this</b>@\[[queue]] be a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"closed"</code>.
<li> Resolve <b>this</b>@\[[closedPromise]] with <b>undefined</b>.
<li> Call-with-rethrow CloseReadableStream(<b>this</b>).
<li> Let <var>sourceCancelPromise</var> be the result of promise-calling <b>this</b>@\[[onCancel]](<var>reason</var>).
<li> Return the result of transforming <var>sourceCancelPromise</var> by a fulfillment handler that returns <b>undefined</b>.
</ol>

<h5 id="rs-get-reader">getReader()</h5>

<div class="note">
The <code>getReader</code> method creates an <a>exclusive stream reader</a> and
<a title="locked to a reader">locks</a> the stream to the the new reader. While the stream is locked, it cannot be
manipulated directly, and will appear to be an inert, empty stream waiting for new <a>chunks</a> to be enqueued.
Instead, the returned reader object can be used to read from or cancel the stream, or to discern its state and state
transitions. If or when the lock is <a title="release a read lock">released</a>, the stream can be used again as
normal.

This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its
entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours, interfering
with your abstraction or observing its side-effects.

Note that when a stream is closed or errors, any reader it is locked to is automatically released.
</div>

<ol>
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, throw <b>this</b>@\[[storedError]].
<li> Return Construct(<code>ExclusiveStreamReader</code>, (<b>this</b>)).
</ol>

<h5 id="rs-pipe-through">pipeThrough({ writable, readable }, options)</h5>

<div class="note">
Expand Down Expand Up @@ -621,18 +672,15 @@ look for the <code>pipeTo</code> method.
</div>

<ol>
<li> If <b>this</b>@\[[reader]] is not <b>undefined</b>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code> or <code>"closed"</code>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, throw <b>this</b>@\[[storedError]].
<li> Assert: <b>this</b>@\[[state]] is <code>"readable"</code>.
<li> Assert: <b>this</b>@\[[queue]] is not empty.
<li> Let <var>chunk</var> be DequeueValue(<b>this</b>@\[[queue]]).
<li> If <b>this</b>@\[[queue]] is now empty,
<ol>
<li> If <b>this</b>@\[[draining]] is <b>true</b>,
<ol>
<li> Set <b>this</b>@\[[state]] to <code>"closed"</code>.
<li> Resolve <b>this</b>@\[[closedPromise]] with <b>undefined</b>.
</ol>
<li> If <b>this</b>@\[[draining]] is <b>true</b>, call-with-rethrow CloseReadableStream(<b>this</b>).
<li> If <b>this</b>@\[[draining]] is <b>false</b>,
<ol>
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
Expand All @@ -643,6 +691,76 @@ look for the <code>pipeTo</code> method.
<li> Return <var>chunk</var>.
</ol>

<h3 id="reader-class">Class <code>ExclusiveStreamReader</code></h3>

<h4 id="reader-class-definition">Class Definition</h4>

<em>This section is non-normative.</em>

If one were to write the <code>ExclusiveStreamReader</code> class in something close to the syntax of [[!ECMASCRIPT]],
it would look like

<pre><code class="lang-javascript">
class ExclusiveStreamReader {
constructor(stream)

get closed()
get isActive()
get ready()
get state()

cancel(reason, ...args)
read(...args)
releaseLock()
}
</code></pre>

<h4 id="reader-internal-slots">Internal Slots</h4>

Instances of <code>ExclusiveStreamReader</code> are created with the internal slots described in the following table:

<table>
<thead>
<tr>
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[stream]]
<td>A <code>ReadableStream</code> instance that this reader is able to read from
</tr>
<tr>
<td>\[[lockReleased]]
<td>A promise that becomes fulfilled when the reader releases its lock on the stream
</tr>
</table>

<h4 id="reader-constructor">new ExclusiveStreamReader(stream)</h4>

<ol>
<li> If <var>stream</var> does not have a \[[reader]] internal slot, throw a <b>TypeError</b> exception.
<li> If <var>stream</var>@\[[reader]] is not <b>undefined</b>, throw a <b>TypeError</b> exception.
<li> Set <var>stream</var>@\[[reader]] to <b>this</b>.
<li> Set <b>this</b>@\[[stream]] to <var>stream</var>.
<li> Set <b>this</b>@\[[lockReleased]] to a new promise.
</ol>

<h4 id="reader-prototype">Properties of the <code>ExclusiveStreamReader</code> Prototype</h4>

<h5 id="reader-closed">get closed</h5>

<h5 id="reader-ready">get ready</h5>

<h5 id="reader-state">get state</h5>

<h5 id="reader-cancel">cancel(reason, ...args)</h5>

<h5 id="reader-read">read(...args)</h5>

<h5 id="reader-release-lock">releaseLock</h5>


<h3 id="rs-abstract-ops">Readable Stream Abstract Operations</h3>

<h4 id="call-readable-stream-pull">CallReadableStreamPull ( stream )</h4>
Expand All @@ -664,6 +782,15 @@ look for the <code>pipeTo</code> method.
<li> Otherwise, return <b>undefined</b>.
</ol>

<h4 id="close-readable-stream">CloseReadableStream ( stream )</h4>

<ol>
<li> Set <var>stream</stream>@\[[state]] to <code>"closed"</code>.
<li> Resolve <var>stream</stream>@\[[closedPromise]] with <b>undefined</b>.
<li> If <var>stream</var>@\[[reader]] is not <b>undefined</b>, call-with-rethrow Invoke(<var>stream</var>@\[[reader]], <code>"releaseLock"</code>).
<li> Return <b>undefined</b>.
</ol>

<h4 id="create-readable-stream-close-function">CreateReadableStreamCloseFunction ( stream )</h4>

<ol>
Expand All @@ -677,8 +804,7 @@ A <dfn>Readable Stream Close Function</dfn> is a built-in anonymous function of
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
<li> Resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
<li> Resolve <var>stream</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Set <var>stream</var>@\[[state]] to <code>"closed"</code>.
<li> Return CloseReadableStream(<b>this</b>).
</ol>
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>,
<ol>
Expand Down Expand Up @@ -728,19 +854,14 @@ A <dfn>Readable Stream Error Function</dfn> is a built-in anonymous function of
a variable <var>stream</var>, that performs the following steps:

<ol>
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
</ol>
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>,
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>, resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>, let <var>stream</var>@\[[queue]] be a new empty List.
<li> If <var>stream</var>@\[[state]] is either <code>"waiting"</code> or <code>"readable"</code>,
<ol>
<li> Let <var>stream</var>@\[[queue]] be a new empty List.
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
<li> If <var>stream</var>@\[[reader]] is not <b>undefined</b>, call-with-rethrow Invoke(<var>stream</var>@\[[reader]], <code>"releaseLock"</code>).
</ol>
</ol>

Expand Down
Loading

0 comments on commit d0b6d73

Please sign in to comment.