Skip to content

Commit

Permalink
stream: forward errored to callback
Browse files Browse the repository at this point in the history
Refs: #39356

PR-URL: #39364
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag committed Jul 14, 2021
1 parent 8306051 commit efd40ea
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
4 changes: 2 additions & 2 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,12 @@ function errorBuffer(state) {
const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length;
state.length -= len;
callback(new ERR_STREAM_DESTROYED('write'));
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}

const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](new ERR_STREAM_DESTROYED('end'));
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
}

resetBuffer(state);
Expand Down
30 changes: 24 additions & 6 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,33 +351,35 @@ const assert = require('assert');
const write = new Writable({
write(chunk, enc, cb) { process.nextTick(cb); }
});
const _err = new Error('asd');
write.once('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err, _err);
}));
write.destroy(new Error('asd'));
write.destroy(_err);
}

{
// Call buffered write callback with error

const _err = new Error('asd');
const write = new Writable({
write(chunk, enc, cb) {
process.nextTick(cb, new Error('asd'));
process.nextTick(cb, _err);
},
autoDestroy: false
});
write.cork();
write.write('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
assert.strictEqual(err, _err);
}));
write.write('asd', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err, _err);
}));
write.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
assert.strictEqual(err, _err);
}));
write.uncork();
}
Expand Down Expand Up @@ -471,3 +473,19 @@ const assert = require('assert');
write.destroy();
write.destroy();
}

{
// https://github.com/nodejs/node/issues/39356
const s = new Writable({
final() {}
});
const _err = new Error('oh no');
// Remove `callback` and it works
s.end(common.mustCall((err) => {
assert.strictEqual(err, _err);
}));
s.on('error', common.mustCall((err) => {
assert.strictEqual(err, _err);
}));
s.destroy(_err);
}
19 changes: 7 additions & 12 deletions test/parallel/test-stream-writable-end-cb-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ const stream = require('stream');
// Invoke end callback on failure.
const writable = new stream.Writable();

const _err = new Error('kaboom');
writable._write = (chunk, encoding, cb) => {
process.nextTick(cb, new Error('kaboom'));
process.nextTick(cb, _err);
};

writable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err, _err);
}));
writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err, _err);
}));
writable.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err, _err);
}));
}

Expand Down Expand Up @@ -57,18 +58,12 @@ const stream = require('stream');
}
});
w.end('testing ended state', common.mustCall((err) => {
// This errors since .destroy(err), which is invoked by errors
// in same tick below, will error all pending callbacks.
// Does this make sense? Not sure.
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
assert.strictEqual(w.destroyed, false);
assert.strictEqual(w.writableEnded, true);
w.end(common.mustCall((err) => {
// This errors since .destroy(err), which is invoked by errors
// in same tick below, will error all pending callbacks.
// Does this make sense? Not sure.
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
assert.strictEqual(w.destroyed, false);
assert.strictEqual(w.writableEnded, true);
Expand Down
5 changes: 3 additions & 2 deletions test/parallel/test-stream-writable-end-cb-uncaught.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ process.on('uncaughtException', common.mustCall((err) => {
}));

const writable = new stream.Writable();
const _err = new Error('kaboom');

writable._write = (chunk, encoding, cb) => {
cb();
};
writable._final = (cb) => {
cb(new Error('kaboom'));
cb(_err);
};

writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(err, _err);
}));

0 comments on commit efd40ea

Please sign in to comment.