Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: construct #29656

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: construct
Provide a standardized way of asynchronously creating and
initializing resources before performing any work.

Refs: #29314
  • Loading branch information
ronag committed May 25, 2020
commit 2fe10448380c0784fc79ce7e3d70fc207788e8b9
171 changes: 164 additions & 7 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
@@ -550,8 +550,7 @@ added: v9.3.0

* {number}

Return the value of `highWaterMark` passed when constructing this
`Writable`.
Return the value of `highWaterMark` passed when creating this `Writable`.
yorkie marked this conversation as resolved.
Show resolved Hide resolved

##### `writable.writableLength`
<!-- YAML
@@ -1193,8 +1192,7 @@ added: v9.3.0

* {number}

Returns the value of `highWaterMark` passed when constructing this
`Readable`.
Returns the value of `highWaterMark` passed when creating this `Readable`.

##### `readable.readableLength`
<!-- YAML
@@ -1792,7 +1790,7 @@ expectations.
added: v1.2.0
-->

For many simple cases, it is possible to construct a stream without relying on
For many simple cases, it is possible to create a stream without relying on
inheritance. This can be accomplished by directly creating instances of the
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
objects and passing appropriate methods as constructor options.
@@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');

const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
ronag marked this conversation as resolved.
Show resolved Hide resolved
// Free resources...
}
});
```
@@ -1861,6 +1865,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][writable-_construct] method.
ronag marked this conversation as resolved.
Show resolved Hide resolved
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

@@ -1906,6 +1912,56 @@ const myWritable = new Writable({
});
```

#### `writable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called in a tick after the stream constructor
has returned, delaying any `_write`, `_final` and `_destroy` calls until
`callback` is called. This is useful to initialize state or asynchronously
initialize resources before the stream can be used.

```js
const { Writable } = require('stream');
const fs = require('fs');

class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = fd;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `writable._write(chunk, encoding, callback)`
<!-- YAML
changes:
@@ -2130,6 +2186,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

@@ -2172,6 +2230,63 @@ const myReadable = new Readable({
});
```

#### `readable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Readable`
class methods only.

This optional function will be called by the stream constructor,
delaying any `_read` and `_destroy` calls until `callback` is called. This is
useful to initialize state or asynchronously initialize resources before the
stream can be used.

```js
const { Readable } = require('stream');
const fs = require('fs');

class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `readable._read(size)`
<!-- YAML
added: v0.9.4
@@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
});
```

When using pipeline:

```js
const { Transform, pipeline } = require('stream');
const fs = require('fs');

pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
}
);
ronag marked this conversation as resolved.
Show resolved Hide resolved
```

#### An Example Duplex Stream

The following illustrates a simple example of a `Duplex` stream that wraps a
@@ -2706,8 +2861,8 @@ unhandled post-destroy errors.

#### Creating Readable Streams with Async Generators

We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from()` utility method:
A Node.js Readable Stream can be created from an asynchronous generator using
ronag marked this conversation as resolved.
Show resolved Hide resolved
the `Readable.from()` utility method:

```js
const { Readable } = require('stream');
@@ -2960,6 +3115,7 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[object-mode]: #stream_object_mode
[readable-_construct]: #stream_readable_construct_callback
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_final]: #stream_writable_final_callback
@@ -2976,6 +3132,7 @@ contain multi-byte characters.
[stream-uncork]: #stream_writable_uncork
[stream-write]: #stream_writable_write_chunk_encoding_callback
[Stream Three States]: #stream_three_states
[writable-_construct]: #stream_writable_construct_callback
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[writable-new]: #stream_constructor_new_stream_writable_options
24 changes: 19 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
@@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
@@ -197,9 +203,16 @@ function Readable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
});
}

Readable.prototype.destroy = destroyImpl.destroy;
@@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
@@ -587,7 +601,7 @@ function emitReadable_(stream) {
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}
27 changes: 25 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
@@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// Emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams.
this.prefinished = false;
@@ -249,9 +255,22 @@ function Writable(options) {

if (typeof options.final === 'function')
this._final = options.final;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
const state = this._writableState;

if (!state.writing) {
clearBuffer(this, state);
}

finishMaybe(this, state);
});
}

// Otherwise people can pipe Writable streams, which is just wrong.
@@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

state.length += len;

if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
@@ -492,7 +511,10 @@ function errorBuffer(state, err) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked || state.bufferProcessing || state.destroyed) {
if (state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
return;
}

@@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {

function needFinish(state) {
return (state.ending &&
state.constructed &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0 &&
Loading