Skip to content

Commit

Permalink
doc: fix nits in stream.md
Browse files Browse the repository at this point in the history
* Unify periods and upper case in comments.
* Add missing parentheses for methods.
* Add missing backticks.
* Fix sorting position of `writable.writableFinished` section.
* Replace a one-letter variable with a more readable one.
* `catch(console.log)` -> `catch(console.error)`.
* Document missing `emitClose` option in `new stream.Readable()` section
  mentioned in https://nodejs.org/api/stream.html#stream_event_close_1
  and https://nodejs.org/api/stream.html#stream_readable_destroy_error
  copying from the `new stream.Writable()` section.
  Refs: https://github.com/nodejs/node/blob/36fdf1aa6c87ccfaebabb8f9c8004baab0549b0b/lib/_stream_readable.js#L121

PR-URL: #28591
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
  • Loading branch information
vsemozhetbyt authored and targos committed Jul 20, 2019
1 parent 0380a55 commit 871a60c
Showing 1 changed file with 57 additions and 55 deletions.
112 changes: 57 additions & 55 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,20 @@ that implements an HTTP server:
const http = require('http');

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
// `req` is an http.IncomingMessage, which is a Readable Stream.
// `res` is an http.ServerResponse, which is a Writable Stream.

let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// Readable streams emit 'data' events once a listener is added
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});

// The 'end' event indicates that the entire body has been received
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
Expand Down Expand Up @@ -250,7 +250,7 @@ function writeOneMillionTimes(writer, data, encoding, callback) {
do {
i--;
if (i === 0) {
// last time!
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
Expand All @@ -259,8 +259,8 @@ function writeOneMillionTimes(writer, data, encoding, callback) {
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
Expand Down Expand Up @@ -410,7 +410,7 @@ Calling the [`stream.write()`][stream-write] method after calling
[`stream.end()`][stream-end] will raise an error.

```js
// Write 'hello, ' and then end with 'world!'
// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
Expand Down Expand Up @@ -480,6 +480,15 @@ added: v11.4.0

Is `true` if it is safe to call [`writable.write()`][stream-write].

##### writable.writableFinished
<!-- YAML
added: v12.6.0
-->

* {boolean}

Is `true` if after the [`'finish'`][] event has been emitted.

##### writable.writableHighWaterMark
<!-- YAML
added: v9.3.0
Expand All @@ -499,16 +508,6 @@ This property contains the number of bytes (or objects) in the queue
ready to be written. The value provides introspection data regarding
the status of the `highWaterMark`.

##### writable.writableFinished
<!-- YAML
added: v12.6.0
-->

* {boolean}

Is `true` if all data has been flushed to the underlying system. After
the [`'finish'`][] event has been emitted.

##### writable.writableObjectMode
<!-- YAML
added: v12.3.0
Expand Down Expand Up @@ -694,11 +693,11 @@ const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // Will not emit 'data'
pass.resume(); // Must be called to make stream emit 'data'
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
```

While `readable.readableFlowing` is `false`, data may be accumulating
Expand Down Expand Up @@ -841,7 +840,7 @@ cause some amount of data to be read into an internal buffer.
```javascript
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now
// There is some data to read now.
let data;

while (data = this.read()) {
Expand Down Expand Up @@ -986,7 +985,7 @@ named `file.txt`:
const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
```
It is possible to attach multiple `Writable` streams to a single `Readable`
Expand Down Expand Up @@ -1061,7 +1060,7 @@ readable.on('readable', () => {

The `while` loop is necessary when processing data with
`readable.read()`. Only after `readable.read()` returns `null`,
[`'readable'`]() will be emitted.
[`'readable'`][] will be emitted.

A `Readable` stream in object mode will always return a single item from
a call to [`readable.read(size)`][stream-read], regardless of the value of the
Expand Down Expand Up @@ -1192,7 +1191,7 @@ const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
Expand Down Expand Up @@ -1231,9 +1230,9 @@ use of a [`Transform`][] stream instead. See the [API for Stream Implementers][]
section for more information.

```js
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
Expand All @@ -1245,13 +1244,13 @@ function parseHeader(stream, callback) {
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// Found the header boundary
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
Expand Down Expand Up @@ -1323,13 +1322,13 @@ const fs = require('fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const k of readable) {
data += k;
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);
print(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be
Expand Down Expand Up @@ -1425,7 +1424,7 @@ finished(rs, (err) => {
}
});

rs.resume(); // drain the stream
rs.resume(); // Drain the stream.
```

Especially useful in error handling scenarios where a stream is destroyed
Expand All @@ -1445,7 +1444,7 @@ async function run() {
}

run().catch(console.error);
rs.resume(); // drain the stream
rs.resume(); // Drain the stream.
```

### stream.pipeline(...streams, callback)
Expand Down Expand Up @@ -1508,6 +1507,7 @@ run().catch(console.error);
* `options` {Object} Options provided to `new stream.Readable([options])`.
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
this is explicitly opted out by setting `options.objectMode` to `false`.
* Returns: {stream.Readable}

A utility method for creating Readable Streams out of iterators.

Expand Down Expand Up @@ -1555,10 +1555,10 @@ on the type of stream being created, as detailed in the chart below:

| Use-case | Class | Method(s) to implement |
| -------- | ----- | ---------------------- |
| Reading only | [`Readable`] | <code>[_read][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>, <code>[_final][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>, <code>[_final][stream-_final]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>, <code>[_final][stream-_final]</code> |
| Reading only | [`Readable`] | <code>[_read()][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform()][stream-_transform]</code>, <code>[_flush()][stream-_flush]</code>, <code>[_final()][stream-_final]</code> |

The implementation code for a stream should *never* call the "public" methods
of a stream that are intended for use by consumers (as described in the
Expand Down Expand Up @@ -1643,7 +1643,7 @@ const { Writable } = require('stream');

class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor
// Calls the stream.Writable() constructor.
super(options);
// ...
}
Expand Down Expand Up @@ -1886,6 +1886,8 @@ changes:
* `objectMode` {boolean} Whether this stream should behave
as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns
a single value instead of a `Buffer` of size `n`. **Default:** `false`.
* `emitClose` {boolean} Whether or not the stream should emit `'close'`
after it has been destroyed. **Default:** `true`.
* `read` {Function} Implementation for the [`stream._read()`][stream-_read]
method.
* `destroy` {Function} Implementation for the
Expand All @@ -1899,7 +1901,7 @@ const { Readable } = require('stream');

class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
Expand Down Expand Up @@ -2026,18 +2028,18 @@ class SourceWrapper extends Readable {

// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};

// When the source ends, push the EOF-signaling `null` chunk
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
Expand Down Expand Up @@ -2070,7 +2072,7 @@ const myReadable = new Readable({
process.nextTick(() => this.emit('error', err));
return;
}
// do some work
// Do some work.
}
});
```
Expand Down Expand Up @@ -2208,7 +2210,7 @@ class MyDuplex extends Duplex {
}

_write(chunk, encoding, callback) {
// The underlying source only deals with strings
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
Expand Down Expand Up @@ -2241,12 +2243,12 @@ the `Readable` side.
```js
const { Transform } = require('stream');

// All Transform streams are also Duplex Streams
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,

transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary
// Coerce the chunk to a number if necessary.
chunk |= 0;

// Transform the chunk into something else.
Expand Down Expand Up @@ -2385,7 +2387,7 @@ user programs.
[`stream.write()`][stream-write].
* `encoding` {string} If the chunk is a string, then this is the
encoding type. If chunk is a buffer, then this is the special
value - 'buffer', ignore it in this case.
value - `'buffer'`, ignore it in this case.
* `callback` {Function} A callback function (optionally with an error
argument and data) to be called after the supplied `chunk` has been
processed.
Expand Down Expand Up @@ -2493,12 +2495,12 @@ const writeable = fs.createWriteStream('./file');

(async function() {
for await (const chunk of iterator) {
// Handle backpressure on write
// Handle backpressure on write().
if (!writeable.write(chunk))
await once(writeable, 'drain');
}
writeable.end();
// Ensure completion without errors
// Ensure completion without errors.
await once(writeable, 'finish');
})();
```
Expand All @@ -2517,7 +2519,7 @@ const writeable = fs.createWriteStream('./file');
(async function() {
const readable = Readable.from(iterator);
readable.pipe(writeable);
// Ensure completion without errors
// Ensure completion without errors.
await once(writeable, 'finish');
})();
```
Expand Down Expand Up @@ -2560,7 +2562,7 @@ For example, consider the following code:
// WARNING! BROKEN!
net.createServer((socket) => {

// We add an 'end' listener, but never consume the data
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
Expand All @@ -2576,7 +2578,7 @@ The workaround in this situation is to call the
[`stream.resume()`][stream-resume] method to begin the flow of data:

```js
// Workaround
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
Expand Down

0 comments on commit 871a60c

Please sign in to comment.