Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add errored and closed props #40696

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

##### `writable.closed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after `'close'` has been emitted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is it also true while it is being emitted? Personally, I'd interpret "after 'close' has been emitted" as in the following snippet, but that's probably not what is meant.

let value = false;
const e = new (require('events').EventEmitter)();

e.addListener('close', () => console.log(value)); // prints false

e.emit('close');

// *after* close has been emitted
value = true;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true enough, but that's how the other props that work similarly are also documented.


##### `writable.destroyed`

<!-- YAML
Expand Down Expand Up @@ -611,6 +621,17 @@ added:
Number of times [`writable.uncork()`][stream-uncork] needs to be
called in order to fully uncork the stream.

##### `writable.writableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that it may not be an actual Error object since we can error with any value


Returns error if the stream has been destroyed with an error.

##### `writable.writableFinished`

<!-- YAML
Expand Down Expand Up @@ -1080,14 +1101,24 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

##### `readable.destroyed`
##### `readable.closed`

<!-- YAML
added: v8.0.0
-->

* {boolean}

Is `true` after `'close'` has been emitted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dito.


##### `readable.destroyed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after [`readable.destroy()`][readable-destroy] has been called.

##### `readable.isPaused()`
Expand Down Expand Up @@ -1346,6 +1377,17 @@ added: v12.9.0

Becomes `true` when [`'end'`][] event is emitted.

##### `readable.readableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}

Returns error if the stream has been destroyed with an error.

##### `readable.readableFlowing`

<!-- YAML
Expand Down
6 changes: 0 additions & 6 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,9 @@ const FileHandleOperations = (handle) => {

function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
// stream.closed = true;
cb(err);
} else {
stream[kFs].close(stream.fd, (er) => {
stream.closed = true;
cb(er || err);
});
stream.fd = null;
Expand Down Expand Up @@ -186,7 +183,6 @@ function ReadStream(path, options) {
this.end = options.end;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
Expand Down Expand Up @@ -358,10 +354,8 @@ function WriteStream(path, options) {
this.start = options.start;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;


if (this.start !== undefined) {
validateInteger(this.start, 'start', 0);

Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const {
isReadable,
isReadableNodeStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');
Expand Down Expand Up @@ -110,7 +112,7 @@ function eos(stream, options, callback) {
const onclose = () => {
closed = true;

const errored = wState?.errored || rState?.errored;
const errored = isWritableErrored(stream) || isReadableErrored(stream);

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
Expand Down
18 changes: 14 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1239,13 +1239,23 @@ ObjectDefineProperties(Readable.prototype, {
}
},

readableErrored: {
enumerable: false,
get() {
return this._readableState ? this._readableState.errored : null;
}
},

closed: {
get() {
return this._readableState ? this._readableState.closed : false;
}
},

destroyed: {
enumerable: false,
get() {
if (this._readableState === undefined) {
return false;
}
return this._readableState.destroyed;
return this._readableState ? this._readableState.destroyed : false;
},
set(value) {
// We ignore the value if the stream
Expand Down
30 changes: 30 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,39 @@ function isFinished(stream, opts) {
return true;
}

function isWritableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.writableErrored) {
return stream.writableErrored;
}

return stream._writableState?.errored ?? null;
}

function isReadableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.readableErrored) {
return stream.readableErrored;
}

return stream._readableState?.errored ?? null;
}

function isClosed(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (typeof stream.closed === 'boolean') {
return stream.closed;
}

const wState = stream._writableState;
const rState = stream._readableState;

Expand Down Expand Up @@ -226,11 +254,13 @@ module.exports = {
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
isServerRequest,
isServerResponse,
willEmitClose,
Expand Down
15 changes: 14 additions & 1 deletion lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ function finish(stream, state) {

ObjectDefineProperties(Writable.prototype, {

closed: {
get() {
return this._writableState ? this._writableState.closed : false;
}
},

destroyed: {
get() {
return this._writableState ? this._writableState.destroyed : false;
Expand Down Expand Up @@ -846,7 +852,14 @@ ObjectDefineProperties(Writable.prototype, {
get() {
return this._writableState && this._writableState.length;
}
}
},

writableErrored: {
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
}
},
});

const destroy = destroyImpl.destroy;
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-fs-read-stream-inherit.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ const rangeFile = fixtures.path('x.txt');
file.on('error', common.mustCall());

process.on('exit', function() {
assert(!file.closed);
assert(file.closed);
assert(file.destroyed);
});
}
2 changes: 1 addition & 1 deletion test/parallel/test-fs-read-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ if (!common.isWindows) {
file.on('error', common.mustCall());

process.on('exit', function() {
assert(!file.closed);
assert(file.closed);
assert(file.destroyed);
});
}
4 changes: 4 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,10 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
const w = new Writable();
const _err = new Error();
w.destroy(_err);
assert.strictEqual(w.writableErrored, _err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
assert.strictEqual(w.closed, true);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
}));
Expand All @@ -623,7 +625,9 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
{
const w = new Writable();
w.destroy();
assert.strictEqual(w.writableErrored, null);
finished(w, common.mustCall((err) => {
assert.strictEqual(w.closed, true);
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const assert = require('assert');
read.on('close', common.mustCall());

read.destroy();
assert.strictEqual(read.readableErrored, null);
assert.strictEqual(read.destroyed, true);
}

Expand All @@ -31,6 +32,7 @@ const assert = require('assert');
}));

read.destroy(expected);
assert.strictEqual(read.readableErrored, expected);
assert.strictEqual(read.destroyed, true);
}

Expand Down