Skip to content

Commit

Permalink
doc: add stream/promises pipeline and finished to doc
Browse files Browse the repository at this point in the history
PR-URL: #45832
Fixes: #45821
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
  • Loading branch information
marco-ippolito authored and juanarbol committed Jan 24, 2023
1 parent 4aaec07 commit 5eb93f1
Showing 1 changed file with 227 additions and 107 deletions.
334 changes: 227 additions & 107 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using
callbacks. The API is accessible via `require('node:stream/promises')`
or `require('node:stream').promises`.

### `stream.pipeline(source[, ...transforms], destination[, options])`

### `stream.pipeline(streams[, options])`

<!-- YAML
added: v15.0.0
-->

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `source` {Stream|Iterable|AsyncIterable|Function}
* Returns: {Promise|AsyncIterable}
* `...transforms` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {Promise|AsyncIterable}
* `destination` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {Promise|AsyncIterable}
* `options` {Object}
* `signal` {AbortSignal}
* `end` {boolean}
* Returns: {Promise} Fulfills when the pipeline is complete.

```cjs
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

```mjs
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
```

To use an `AbortSignal`, pass it inside an options object, as the last argument.
When the signal is aborted, `destroy` will be called on the underlying pipeline,
with an `AbortError`.

```cjs
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
const ac = new AbortController();
const signal = ac.signal;

setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}

run().catch(console.error); // AbortError
```

```mjs
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
```

The `pipeline` API also supports async generators:

```cjs
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

```mjs
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
```

Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.

```cjs
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

```mjs
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
```

The `pipeline` API provides [callback version][stream-pipeline]:

### `stream.finished(stream[, options])`

<!-- YAML
added: v15.0.0
-->

* `stream` {Stream}
* `options` {Object}
* `error` {boolean|undefined}
* `readable` {boolean|undefined}
* `writable` {boolean|undefined}
* `signal`: {AbortSignal|undefined}
* Returns: {Promise} Fulfills when the stream is no
longer readable or writable.

```cjs
const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.
```

```mjs
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.
```

The `finished` API provides [callback version][stream-finished]:

### Object mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -2425,22 +2646,7 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API provides promise version:

```js
const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.
```
The `finished` API provides [promise version][stream-finished-promise].

`stream.finished()` leaves dangling event listeners (in particular
`'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been
Expand Down Expand Up @@ -2520,97 +2726,7 @@ pipeline(
);
```

The `pipeline` API provides a promise version, which can also
receive an options argument as the last parameter with a
`signal` {AbortSignal} property. When the signal is aborted,
`destroy` will be called on the underlying pipeline, with an
`AbortError`.

```js
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

To use an `AbortSignal`, pass it inside an options object,
as the last argument:

```js
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
const ac = new AbortController();
const signal = ac.signal;

setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}

run().catch(console.error); // AbortError
```

The `pipeline` API also supports async generators:

```js
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.

```js
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```
The `pipeline` API provides a [promise version][stream-pipeline-promise].

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:

Expand Down Expand Up @@ -4544,7 +4660,11 @@ contain multi-byte characters.
[stream-_write]: #writable_writechunk-encoding-callback
[stream-_writev]: #writable_writevchunks-callback
[stream-end]: #writableendchunk-encoding-callback
[stream-finished]: #streamfinishedstream-options-callback
[stream-finished-promise]: #streamfinishedstream-options
[stream-pause]: #readablepause
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
[stream-push]: #readablepushchunk-encoding
[stream-read]: #readablereadsize
[stream-resume]: #readableresume
Expand Down

0 comments on commit 5eb93f1

Please sign in to comment.