Skip to content

Commit

Permalink
stream: utility consumers for web and node.js streams
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: #39594
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell authored and danielleadams committed Aug 16, 2021
1 parent 4700f1e commit f57a0e4
Show file tree
Hide file tree
Showing 3 changed files with 442 additions and 0 deletions.
124 changes: 124 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1219,5 +1219,129 @@ added: v16.6.0
* Type: {WritableStream}
### Class: `CompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new CompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `compressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `compressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
### Class: `DecompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new DecompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `decompressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `deccompressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
### Utility Consumers
<!-- YAML
added: REPLACEME
-->
The utility consumer functions provide common options for consuming
streams.
They are accessed using:
```mjs
import {
arrayBuffer,
blob,
json,
text,
} from 'node:stream/consumers';
```
```cjs
const {
arrayBuffer,
blob,
json,
text,
} = require('stream/consumers');
```
#### `streamConsumers.arrayBuffer(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
contents of the stream.
#### `streamConsumers.blob(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Blob} containing the full contents
of the stream.
#### `streamConsumers.buffer(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Buffer} containing the full
contents of the stream.
#### `streamConsumers.json(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string that is then passed through `JSON.parse()`.
#### `streamConsumers.text(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string.
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
84 changes: 84 additions & 0 deletions lib/stream/consumers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict';

const {
JSONParse,
} = primordials;

const {
TextDecoder,
} = require('internal/encoding');

const {
Blob,
} = require('internal/blob');

const {
Buffer,
} = require('buffer');

/**
* @typedef {import('../internal/webstreams/readablestream').ReadableStream
* } ReadableStream
* @typedef {import('../internal/streams/readable')} Readable
*/

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Blob>}
*/
async function blob(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return new Blob(chunks);
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<ArrayBuffer>}
*/
async function arrayBuffer(stream) {
const ret = await blob(stream);
return ret.arrayBuffer();
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Buffer>}
*/
async function buffer(stream) {
return Buffer.from(await arrayBuffer(stream));
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<string>}
*/
async function text(stream) {
const dec = new TextDecoder();
let str = '';
for await (const chunk of stream) {
if (typeof chunk === 'string')
str += chunk;
else
str += dec.decode(chunk, { stream: true });
}
return str;
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<any>}
*/
async function json(stream) {
const str = await text(stream);
return JSONParse(str);
}

module.exports = {
arrayBuffer,
blob,
buffer,
text,
json,
};
Loading

0 comments on commit f57a0e4

Please sign in to comment.