Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: construct #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 172 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,15 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### writable.writableReady
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is set to `true` immediately before the [`'ready'`][] event is emitted.

##### writable.write(chunk[, encoding][, callback])
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -1169,6 +1178,15 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Readable` stream.

##### readable.readableReady
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is set to `true` immediately before the [`'ready'`][] event is emitted.

##### readable.resume()
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -1622,8 +1640,8 @@ on the type of stream being created, as detailed in the chart below:
| Use-case | Class | Method(s) to implement |
| -------- | ----- | ---------------------- |
| Reading only | [`Readable`] | <code>[_read()][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code>, <code>[_construct()][stream-_construct]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code>, <code>[_construct()][stream-_construct]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform()][stream-_transform]</code>, <code>[_flush()][stream-_flush]</code>, <code>[_final()][stream-_final]</code> |

The implementation code for a stream should *never* call the "public" methods
Expand All @@ -1645,8 +1663,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');

const myWritable = new Writable({
construct(options, callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
}
});
```
Expand Down Expand Up @@ -1700,6 +1724,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][stream-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

Expand Down Expand Up @@ -1745,6 +1771,57 @@ const myWritable = new Writable({
});
```

#### writable.\_construct(options, callback)
<!-- YAML
added: REPLACEME
-->

* `options` {Object} Options passed to constructor.
* `callback` {Function} Call this function (optionally with an error
argument) when finished writing any remaining data.

The `_construct()` method **must not** be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called by the stream constructor,
delaying the `'ready'` event until `callback` is called. This is useful to
initalize state or asynchronously initialize resources before the stream
can be used.

```js
const { Writable } = require('stream');
const fs = require('fs');

class WriteStream extends Writable {
constructor(filename) {
super({ filename, autoDestroy: true });
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### writable.\_write(chunk, encoding, callback)

* `chunk` {Buffer|string|any} The `Buffer` to be written, converted from the
Expand Down Expand Up @@ -1958,6 +2035,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

Expand Down Expand Up @@ -2000,6 +2079,64 @@ const myReadable = new Readable({
});
```

#### readable.\_construct(options, callback)
<!-- YAML
added: REPLACEME
-->

* `options` {Object} Options passed to constructor.
* `callback` {Function} Call this function (optionally with an error
argument) when finished writing any remaining data.

The `_construct()` method **must not** be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called by the stream constructor,
delaying the `'ready'` event until `callback` is called. This is useful to
initalize state or asynchronously initialize resources before the stream
can be used.

```js
const { Readable } = require('stream');
const fs = require('fs');

class ReadStream extends Readable {
constructor(filename) {
super({ autoDestroy: true, filename });
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### readable.\_read(size)
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -2255,6 +2392,38 @@ const myDuplex = new Duplex({
});
```

When using pipeline:

```js
const { Duplex } = require('stream');
const fs = require('fs');

stream.pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Duplex({
construct(options, callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json')
);
```

#### An Example Duplex Stream

The following illustrates a simple example of a `Duplex` stream that wraps a
Expand Down Expand Up @@ -2748,6 +2917,7 @@ contain multi-byte characters.
[object-mode]: #stream_object_mode
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_construct]: #stream_writable_construct_options_callback
[stream-_final]: #stream_writable_final_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});

Object.defineProperty(Duplex.prototype, 'writableReady', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.status === 2;
}
});

Object.defineProperty(Duplex.prototype, 'writableBuffer', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
37 changes: 34 additions & 3 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and no operations
// can take place until construction finished.
this.ready = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
this.pending = true;

// Callback to continue destruction after constrution
// has finished or failed.
this.destroyCallback = null;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -178,9 +190,22 @@ function Readable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this);

if (typeof this._construct === 'function') {
this.once('ready', function() {
maybeReadMore(this, this._readableState);
});
destroyImpl.construct(this, options);
} else {
this._readableState.pending = false;
this._readableState.ready = true;
}
}

Object.defineProperty(Readable.prototype, 'destroyed', {
Expand Down Expand Up @@ -489,9 +514,9 @@ Readable.prototype.read = function(n) {

// However, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading) {
if (state.ended || state.reading || state.pending) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or pending', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
Expand Down Expand Up @@ -643,7 +668,7 @@ function maybeReadMore_(stream, state) {
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while (!state.reading && !state.ended &&
while (!state.reading && !state.ended && !state.destroyed &&
(state.length < state.highWaterMark ||
(state.flowing && state.length === 0))) {
const len = state.length;
Expand Down Expand Up @@ -1077,6 +1102,12 @@ Readable.prototype[Symbol.asyncIterator] = function() {
return createReadableStreamAsyncIterator(this);
};

Object.defineProperty(Readable.prototype, 'readableReady', {
get() {
return this._readableState.ready;
}
});

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
Loading