Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: writable write emit all errors async #29744

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,8 @@ methods only.
The `callback` method must be called to signal either that the write completed
successfully or failed with an error. The first argument passed to the
`callback` must be the `Error` object if the call failed or `null` if the
write succeeded.
write succeeded. The `callback` method will always be called asynchronously and
before `'error'` is emitted.

All calls to `writable.write()` that occur between the time `writable._write()`
is called and the `callback` is called will cause the written data to be
Expand Down
81 changes: 32 additions & 49 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,33 +265,6 @@ Writable.prototype.pipe = function() {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};


function writeAfterEnd(stream, cb) {
const er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

// Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
// and undefined/non-string values are only allowed in object mode.
function validChunk(stream, state, chunk, cb) {
var er;

if (chunk === null) {
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
return true;
}

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
Expand All @@ -315,17 +288,25 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (typeof cb !== 'function')
cb = nop;

let err;
if (state.ending) {
writeAfterEnd(this, cb);
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
} else if (isBuf || validChunk(this, state, chunk, cb)) {
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}

if (err) {
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
}

return ret;
};

Expand Down Expand Up @@ -629,7 +610,7 @@ Writable.prototype._write = function(chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
}
};

Expand All @@ -656,15 +637,25 @@ Writable.prototype.end = function(chunk, encoding, cb) {
this.uncork();
}

if (typeof cb !== 'function')
cb = nop;

// Ignore unnecessary end() calls.
if (!state.ending) {
// TODO(ronag): Compat. Allow end() after destroy().
if (!state.errored && !state.ending) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now if someone calls .end() after an error stream won't have state.ended \ state.ending set up and therefore stream.writableEnded and explicit through state checks will remain false IIUC (due to !state.errored check here), is that ok?

Copy link
Member Author

@ronag ronag Dec 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that ok?

I think it should be ok since the stream is .destroyed here. In my opinion it shouldn't be able to be ended ever once .destroyed. However, that's more breaking and .errored is a compromise. i.e. how can a end() succeed when the stream has already failed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, didn't think about the destroy. Could you please add a comment about this in the docs? (probably in _write error case, 'error' event and in the .end() itself IIUC).

Copy link
Member Author

@ronag ronag Dec 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to add anything about this in the docs until the TODO's are resolved and/or we are consistent. The behaviour here is already undocumented and inconsistent.

I don't think this change will affect anyone in practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense since we might change this behavior though how quickly do you think we would be able to resolve these? Since in the case of streams such inconsistencies may hang around for quite a lot of time and IMO they better be documented in that case.

Though I completely agree with it shouldn't be able to be ended ever once .destroyed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense since we might change this behavior though how quickly do you think we would be able to resolve these?

No idea. I'm trying to get it done in some fashion as soon as possible. Realistically, maybe sometime next year?

To be honest part of my disagreement is that I'm unsure how to and where to document this in order for it not to simply cause more confusion.

endWritable(this, state, cb);
} else if (typeof cb === 'function') {
if (!state.finished) {
onFinished(this, state, cb);
} else {
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
}
} else if (state.finished) {
const err = new ERR_STREAM_ALREADY_FINISHED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (cb !== nop) {
onFinished(this, state, cb);
ronag marked this conversation as resolved.
Show resolved Hide resolved
}

return this;
Expand Down Expand Up @@ -749,7 +740,7 @@ function finish(stream, state) {
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state, true);
if (cb) {
if (cb !== nop) {
if (state.finished)
process.nextTick(cb);
else
Expand All @@ -774,14 +765,6 @@ function onCorkedFinish(corkReq, state, err) {
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
BridgeAR marked this conversation as resolved.
Show resolved Hide resolved
// TODO(ronag): Backwards compat. Should be moved to end() without
// errorEmitted check and with errorOrDestroy.
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
return;
}

function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
Expand Down
9 changes: 7 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function undestroy() {
}
}

function errorOrDestroy(stream, err) {
function errorOrDestroy(stream, err, sync) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
Expand All @@ -138,7 +138,12 @@ function errorOrDestroy(stream, err) {
if (r) {
r.errored = true;
}
emitErrorNT(stream, err);

if (sync) {
process.nextTick(emitErrorNT, stream, err);
} else {
emitErrorNT(stream, err);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-child-process-server-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const server = net.createServer((conn) => {
}));
}).listen(common.PIPE, () => {
const client = net.connect(common.PIPE, common.mustCall());
client.on('data', () => {
client.once('data', () => {
client.end(() => {
server.close();
});
Expand Down
23 changes: 8 additions & 15 deletions test/parallel/test-file-write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.

'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const path = require('path');
Expand All @@ -46,9 +46,6 @@ file
callbacks.open++;
assert.strictEqual(typeof fd, 'number');
})
.on('error', function(err) {
throw err;
})
.on('drain', function() {
console.error('drain!', callbacks.drain);
callbacks.drain++;
Expand All @@ -65,17 +62,13 @@ file
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);

callbacks.close++;
assert.throws(
() => {
console.error('write after end should not be allowed');
file.write('should not work anymore');
},
{
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}
);
console.error('write after end should not be allowed');
file.write('should not work anymore');
file.on('error', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}));

fs.unlinkSync(fn);
});
Expand Down
16 changes: 9 additions & 7 deletions test/parallel/test-net-socket-write-error.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
'use strict';

require('../common');
const assert = require('assert');
const common = require('../common');
const net = require('net');

const server = net.createServer().listen(0, connectToServer);

function connectToServer() {
const client = net.createConnection(this.address().port, () => {
assert.throws(() => client.write(1337),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
});
client.write(1337, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));
client.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));

client.destroy();
})
Expand Down
18 changes: 11 additions & 7 deletions test/parallel/test-net-write-arguments.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

const socket = net.Stream({ highWaterMark: 0 });

// Make sure that anything besides a buffer or a string throws.
assert.throws(() => socket.write(null),
{
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
});
socket.write(null, common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));
socket.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));

[
true,
false,
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ const assert = require('assert');
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, false);
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-stream-writable-end-multiple.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ const assert = require('assert');
const stream = require('stream');

const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
setTimeout(() => cb(), 10);
};

writable.end('testing ended state', common.mustCall());
writable.end(common.mustCall());
writable.on('finish', common.mustCall(() => {
let ticked = false;
writable.end(common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
}));
30 changes: 13 additions & 17 deletions test/parallel/test-stream-writable-null.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const stream = require('stream');
Expand All @@ -11,33 +11,29 @@ class MyWritable extends stream.Writable {
}
}

assert.throws(
() => {
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
},
{
{
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable({ objectMode: true }).on('error', assert);
m.write(null, assert);
}

assert.throws(
() => {
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
},
{
{
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable().on('error', assert);
Expand Down
Loading