Skip to content

Commit

Permalink
Change the model for ReadableStream to have async read()
Browse files Browse the repository at this point in the history
This replaces the dual ready + read() approach previously, which was derived from the epoll(7) + read(2) paradigm. In #253, we discussed about how the ready + read() model causes a conflict with the semantics we want for byte streams. Briefly, because some byte streams will demand to know the size of the buffer they must fill before doing any I/O (the fread(3) model), the readInto(arrayBuffer, ...) method for byte streams must be asynchronous. If such byte streams are then to conform to the readable stream interface, with a read() method derived from their readInto() method, then read() must also be async, across all readable streams.

This is a slight usability upgrade for consumers, in some cases. However, it potentially costs more microtasks when multiple chunks of data would be available synchronously.

In the process of updating the tests to reflect async read, they were given a number of small unrelated tweaks (e.g. to wording, or to eliminate some setTimeout(,0)s).

TODO:
- This commit eliminates ExclusiveStreamReader, but this was done in error based on mistaken assumptions. It will be reversed.
- Almost none of the spec is updated. Examples.md was updated and the examples in the spec were updated, but none of the algorithms or non-normative notes.
  • Loading branch information
domenic committed Feb 20, 2015
1 parent a4ffbfd commit 2ee6a77
Show file tree
Hide file tree
Showing 19 changed files with 1,266 additions and 2,129 deletions.
48 changes: 8 additions & 40 deletions Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,6 @@ Many examples of using and creating streams are given in-line in the specificati

## Readable Streams

### Getting the Next Piece of Available Data

As another example, this helper function will return a promise for the next available piece of data from a given readable stream. This introduces an artificial delay if there is already data queued, but can provide a convenient interface for simple chunk-by-chunk consumption, as one might do e.g. when streaming database records. It uses an EOF sentinel to signal the end of the stream, and behaves poorly if called twice in parallel without waiting for the previously-returned promise to fulfill.

```js
const EOF = Symbol("ReadableStream getNext EOF");

function getNext(stream) {
if (stream.state === "closed") {
return Promise.resolve(EOF);
}

return stream.ready.then(() => {
if (stream.state === "closed") {
return EOF;
}

// If stream is "errored", this will throw, causing the promise to be rejected.
return stream.read();
});
}

// Usage with proposed ES2016 async/await keywords:
async function processStream(stream) {
while ((const chunk = await getNext(stream)) !== EOF) {
// do something with `chunk`.
}
}
```

### Buffering the Entire Stream Into Memory

This function uses the reading APIs to buffer the entire stream in memory and give a promise for the results, defeating the purpose of streams but educating us while doing so:
Expand All @@ -42,19 +12,17 @@ This function uses the reading APIs to buffer the entire stream in memory and gi
function readableStreamToArray(readable) {
const chunks = [];

pump();
return readable.closed.then(() => chunks);
return pump();

function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}

if (readable.state === "waiting") {
readable.ready.then(pump);
}
return readable.read().then(chunk => {
if (chunk === ReadableStream.EOS) {
return chunks;
}

// Otherwise the stream is "closed" or "errored", which will be handled above.
chunks.push(chunk);
return pump();
});
}
}

Expand Down
43 changes: 18 additions & 25 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,18 @@ associated reader will automatically release its lock.
</div>

<div class="example">
Although readable streams will usually be used by piping them to a writable stream, you can also "pump" them
directly, alternating between using the <code>read()</code> method and the <code>ready</code> getter according to the
current value of the <code>state</code> property. For example, this function writes the contents of a readable stream
to the console as fast as they are available.
Although readable streams will usually be used by piping them to a writable stream, you can also read them directly,
using their <code>read()</code> method to get successive chunks. For example, this function writes the contents of a
readable stream to the console as fast as they are available.

<pre><code class="lang-javascript">
function logChunks(readableStream) {
while (readableStream.state === "readable") {
console.log(readableStream.read());
}

if (readableStream.state === "waiting") {
console.log("--- waiting for more data to be available...");
readableStream.ready.then(() => logChunks(readableStream));
return readable.read().then(chunk => {
if (chunk !=== ReadableStream.EOS) {
console.log(chunk);
return logChunks(readableStream);
}
}

return readableStream.closed;
}

logChunks(readableStream)
Expand All @@ -228,7 +223,6 @@ would look like
constructor(underlyingSource = {})

get closed()
get ready()
get state()

cancel(reason)
Expand Down Expand Up @@ -477,18 +471,17 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
const reader = readableStream.getReader();
const chunks = [];

pump();

return reader.closed.then(() => chunks);
return pump();

function pump() {
while (reader.state === "readable") {
chunks.push(reader.read());
}
return readable.read().then(chunk => {
if (chunk === ReadableStream.EOS) {
return chunks;
}

if (reader.state === "waiting") {
reader.ready.then(pump);
}
chunks.push(chunk);
return pump();
});
}
}
</code></pre>
Expand Down Expand Up @@ -2206,8 +2199,8 @@ APIs:
streamyWS.writable.write("Hello");
streamyWS.writable.write("web socket!");

streamyWS.readable.ready.then(() => {
console.log("The web socket says: ", streamyWS.readable.read());
streamyWS.readable.read().then(chunk => {
console.log("The web socket says: ", chunk);
});
</code></pre>

Expand Down
148 changes: 0 additions & 148 deletions reference-implementation/lib/exclusive-stream-reader.js

This file was deleted.

Loading

0 comments on commit 2ee6a77

Please sign in to comment.