From 9f4a28fa6a54f3b124c975d575feffb7ff7769d0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 24 Aug 2019 16:33:46 +0200 Subject: [PATCH 1/6] stream: always invoke callback before emitting error Ensure the callback is always invoked before emitting the error in both sync and async case. --- doc/api/stream.md | 3 +- lib/_stream_writable.js | 48 ++++++++++----- lib/internal/streams/destroy.js | 11 +++- test/parallel/test-http2-reset-flood.js | 5 +- .../test-stream-writable-write-cb-error.js | 58 +++++++++++++++++++ ...est-stream-writable-write-writev-finish.js | 2 +- .../test-wrap-js-stream-exceptions.js | 6 +- test/parallel/test-zlib-write-after-close.js | 14 ++--- 8 files changed, 119 insertions(+), 28 deletions(-) create mode 100644 test/parallel/test-stream-writable-write-cb-error.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 1f0f08a7b86a7f..274f861d8c563d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -571,7 +571,8 @@ The `writable.write()` method writes some data to the stream, and calls the supplied `callback` once the data has been fully handled. If an error occurs, the `callback` *may or may not* be called with the error as its first argument. To reliably detect write errors, add a listener for the -`'error'` event. +`'error'` event. If `callback` is called with an error, it will be called +before the `'error'` event is emitted. The return value is `true` if the internal buffer is less than the `highWaterMark` configured when the stream was created after admitting `chunk`. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4cb3be5c008e9e..07b5291c59d467 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -47,7 +47,7 @@ const { ERR_UNKNOWN_ENCODING } = require('internal/errors').codes; -const { errorOrDestroy } = destroyImpl; +const { errorOrDestroy, errorMaybe } = destroyImpl; Object.setPrototypeOf(Writable.prototype, Stream.prototype); Object.setPrototypeOf(Writable, Stream); @@ -155,6 +155,11 @@ function WritableState(options, stream, isDuplex) { // Should .destroy() be called after 'finish' (and potentially 'end') this.autoDestroy = !!options.autoDestroy; + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + this.errored = false; + // Count buffered requests this.bufferedRequestCount = 0; @@ -394,7 +399,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked) { + if (state.writing || state.corked || state.errored) { var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, @@ -413,7 +418,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { doWrite(stream, state, false, len, chunk, encoding, cb); } - return ret; + // Return false if errored or destroyed in order to break + // any synchronous while(stream.write(data)) loops. + return ret && !state.errored && !state.destroyed; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -431,18 +438,19 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { } function onwriteError(stream, state, sync, er, cb) { - --state.pendingcb; - if (sync) { - // Defer the callback if we are being called synchronously - // to avoid piling up things on the stack - process.nextTick(cb, er); + process.nextTick(onwriteErrorNT, stream, state, er, cb); } else { - // The caller expect this to happen before if - // it is async - cb(er); + onwriteErrorNT(stream, state, er, cb); } - errorOrDestroy(stream, er); +} + +function onwriteErrorNT(stream, state, er, cb) { + --state.pendingcb; + + cb(er); + // This can emit error, but error must always follow cb. + errorMaybe(stream, er); } function onwrite(stream, er) { @@ -458,9 +466,19 @@ function onwrite(stream, er) { state.length -= state.writelen; state.writelen = 0; - if (er) - onwriteError(stream, state, sync, er, cb); - else { + if (er) { + state.errored = true; + if (state.autoDestroy) { + stream.destroy(er, (err) => { + // TODO(ronag): Minor optimization opportunities: + // - We might no longer be sync here. + // - Closure allocation can probably be optimized away. + onwriteError(stream, state, sync, err, cb); + }); + } else { + onwriteError(stream, state, sync, er, cb); + } + } else { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 27985482cee70b..2f60049f96a4d8 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -50,10 +50,12 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { const emitClose = (w && w.emitClose) || (r && r.emitClose); if (cb) { + // Invoke callback before scheduling emitClose so that callback + // can schedule before. + cb(err); if (emitClose) { process.nextTick(emitCloseNT, this); } - cb(err); } else if (needError(this, err)) { process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); } else if (emitClose) { @@ -91,6 +93,7 @@ function undestroy() { if (w) { w.destroyed = false; + w.errored = false; w.ended = false; w.ending = false; w.finalCalled = false; @@ -116,9 +119,15 @@ function errorOrDestroy(stream, err) { stream.emit('error', err); } +function errorMaybe(stream, err) { + if (needError(stream, err)) { + stream.emit('error', err); + } +} module.exports = { destroy, undestroy, + errorMaybe, errorOrDestroy }; diff --git a/test/parallel/test-http2-reset-flood.js b/test/parallel/test-http2-reset-flood.js index a6553401fbb6e7..9977bfd1a3e669 100644 --- a/test/parallel/test-http2-reset-flood.js +++ b/test/parallel/test-http2-reset-flood.js @@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => { h2header.writeIntBE(1, 0, 3); // Length: 1 h2header.writeIntBE(i, 5, 4); // Stream ID // 0x88 = :status: 200 - conn.write(Buffer.concat([h2header, Buffer.from([0x88])])); + if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) { + process.nextTick(writeRequests); + break; + } } } diff --git a/test/parallel/test-stream-writable-write-cb-error.js b/test/parallel/test-stream-writable-write-cb-error.js new file mode 100644 index 00000000000000..72db1b7e3ffe70 --- /dev/null +++ b/test/parallel/test-stream-writable-write-cb-error.js @@ -0,0 +1,58 @@ +'use strict'; +const common = require('../common'); +const { Writable } = require('stream'); +const assert = require('assert'); + +// Ensure callback is always invoked before +// error is emitted. Regardless if error was +// sync or async. + +{ + let callbackCalled = false; + // Sync Error + const writable = new Writable({ + write: common.mustCall((buf, enc, cb) => { + cb(new Error()); + }) + }); + writable.on('error', common.mustCall(() => { + assert.strictEqual(callbackCalled, true); + })); + writable.write('hi', common.mustCall(() => { + callbackCalled = true; + })); +} + +{ + let callbackCalled = false; + // Async Error + const writable = new Writable({ + write: common.mustCall((buf, enc, cb) => { + process.nextTick(cb, new Error()); + }) + }); + writable.on('error', common.mustCall(() => { + assert.strictEqual(callbackCalled, true); + })); + writable.write('hi', common.mustCall(() => { + callbackCalled = true; + })); +} + +{ + // Sync Error + const writable = new Writable({ + write: common.mustCall((buf, enc, cb) => { + cb(new Error()); + }) + }); + + writable.on('error', common.mustCall()); + + let cnt = 0; + // Ensure we don't live lock on sync error + while (writable.write('a')) + cnt++; + + assert.strictEqual(cnt, 0); +} diff --git a/test/parallel/test-stream-writable-write-writev-finish.js b/test/parallel/test-stream-writable-write-writev-finish.js index aa43b1490c8600..62282b14836820 100644 --- a/test/parallel/test-stream-writable-write-writev-finish.js +++ b/test/parallel/test-stream-writable-write-writev-finish.js @@ -117,7 +117,7 @@ const stream = require('stream'); const ws = new stream.Writable(); - ws.on('finish', common.mustNotCall()); + ws.on('finish', common.mustCall()); ws.on('error', common.mustCall()); ws._write = (chunk, encoding, done) => { diff --git a/test/parallel/test-wrap-js-stream-exceptions.js b/test/parallel/test-wrap-js-stream-exceptions.js index cde7c178446a11..d6c9e5c75cea69 100644 --- a/test/parallel/test-wrap-js-stream-exceptions.js +++ b/test/parallel/test-wrap-js-stream-exceptions.js @@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({ }) })); -assert.throws(() => socket.end('foo'), /Error: write EPROTO/); +socket.end('foo'); +socket.on('error', common.expectsError({ + type: Error, + message: 'write EPROTO' +})); diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 160971b16bc30c..24d1e9b9901a94 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -26,12 +26,10 @@ const zlib = require('zlib'); zlib.gzip('hello', common.mustCall(function(err, out) { const unzip = zlib.createGunzip(); unzip.close(common.mustCall()); - common.expectsError( - () => unzip.write(out), - { - code: 'ERR_STREAM_DESTROYED', - type: Error, - message: 'Cannot call write after a stream was destroyed' - } - ); + + unzip.write(out); + unzip.on('error', common.expectsError({ + code: 'ERR_STREAM_DESTROYED', + type: Error + })); })); From 1cd617b80e0bd7f01b5fc8374a5f93ffa6460a08 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Sep 2019 18:33:02 +0200 Subject: [PATCH 2/6] fixup: no finish or prefinish on errored --- lib/_stream_writable.js | 2 +- test/parallel/test-stream-writable-write-writev-finish.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 07b5291c59d467..d6606a1dd0cd1d 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -629,7 +629,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', { function needFinish(state) { return (state.ending && state.length === 0 && - !state.errorEmitted && + !state.errored && state.bufferedRequest === null && !state.finished && !state.writing); diff --git a/test/parallel/test-stream-writable-write-writev-finish.js b/test/parallel/test-stream-writable-write-writev-finish.js index 62282b14836820..aa43b1490c8600 100644 --- a/test/parallel/test-stream-writable-write-writev-finish.js +++ b/test/parallel/test-stream-writable-write-writev-finish.js @@ -117,7 +117,7 @@ const stream = require('stream'); const ws = new stream.Writable(); - ws.on('finish', common.mustCall()); + ws.on('finish', common.mustNotCall()); ws.on('error', common.mustCall()); ws._write = (chunk, encoding, done) => { From 5da8563bae3eaa3bdced04fac1a2182da878aac2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Sep 2019 18:41:28 +0200 Subject: [PATCH 3/6] fixup: simplify --- lib/_stream_writable.js | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d6606a1dd0cd1d..9211e03825d6b0 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -437,20 +437,12 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.sync = false; } -function onwriteError(stream, state, sync, er, cb) { - if (sync) { - process.nextTick(onwriteErrorNT, stream, state, er, cb); - } else { - onwriteErrorNT(stream, state, er, cb); - } -} - -function onwriteErrorNT(stream, state, er, cb) { +function onwriteError(stream, state, er, cb) { --state.pendingcb; cb(er); // This can emit error, but error must always follow cb. - errorMaybe(stream, er); + errorOrDestroy(stream, er); } function onwrite(stream, er) { @@ -468,15 +460,10 @@ function onwrite(stream, er) { if (er) { state.errored = true; - if (state.autoDestroy) { - stream.destroy(er, (err) => { - // TODO(ronag): Minor optimization opportunities: - // - We might no longer be sync here. - // - Closure allocation can probably be optimized away. - onwriteError(stream, state, sync, err, cb); - }); + if (sync) { + process.nextTick(onwriteError, stream, state, er, cb); } else { - onwriteError(stream, state, sync, er, cb); + onwriteError(stream, state, er, cb); } } else { // Check if we're actually ready to finish, but don't emit yet From 14428f3e79eea2eae1a78e8720c3a499d4866632 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Sep 2019 19:11:52 +0200 Subject: [PATCH 4/6] fixup: no finish after destroy error --- lib/internal/streams/destroy.js | 8 ++++++++ test/parallel/test-stream-writable-destroy.js | 14 ++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 2f60049f96a4d8..69e63169d48626 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -27,6 +27,10 @@ function destroy(err, cb) { const r = this._readableState; const w = this._writableState; + if (w && err) { + w.errored = true; + } + if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { cb(err); @@ -113,6 +117,10 @@ function errorOrDestroy(stream, err) { const r = stream._readableState; const w = stream._writableState; + if (w & err) { + w.errored = true; + } + if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (needError(stream, err)) diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index a431d6d48d1c8e..f08b755a95631b 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -16,6 +16,20 @@ const assert = require('assert'); assert.strictEqual(write.destroyed, true); } +{ + const write = new Writable({ + write(chunk, enc, cb) { + this.destroy(new Error('asd')); + cb(); + } + }); + + write.on('error', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.end('asd'); + assert.strictEqual(write.destroyed, true); +} + { const write = new Writable({ write(chunk, enc, cb) { cb(); } From eb9099bb1300ad930d2d516cc5e089643ad5dd9b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Sep 2019 19:14:20 +0200 Subject: [PATCH 5/6] fixup: remove unused errorMaybe --- lib/_stream_writable.js | 2 +- lib/internal/streams/destroy.js | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9211e03825d6b0..fe24abb0f43435 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -47,7 +47,7 @@ const { ERR_UNKNOWN_ENCODING } = require('internal/errors').codes; -const { errorOrDestroy, errorMaybe } = destroyImpl; +const { errorOrDestroy } = destroyImpl; Object.setPrototypeOf(Writable.prototype, Stream.prototype); Object.setPrototypeOf(Writable, Stream); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 69e63169d48626..8708ca022c7f8d 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -127,15 +127,9 @@ function errorOrDestroy(stream, err) { stream.emit('error', err); } -function errorMaybe(stream, err) { - if (needError(stream, err)) { - stream.emit('error', err); - } -} module.exports = { destroy, undestroy, - errorMaybe, errorOrDestroy }; From 60dac5221d682ec78ae91664a49d045bef7df98d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 28 Sep 2019 16:11:26 +0200 Subject: [PATCH 6/6] fixup: remove no longer necessary test change --- test/parallel/test-zlib-write-after-close.js | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 24d1e9b9901a94..160971b16bc30c 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -26,10 +26,12 @@ const zlib = require('zlib'); zlib.gzip('hello', common.mustCall(function(err, out) { const unzip = zlib.createGunzip(); unzip.close(common.mustCall()); - - unzip.write(out); - unzip.on('error', common.expectsError({ - code: 'ERR_STREAM_DESTROYED', - type: Error - })); + common.expectsError( + () => unzip.write(out), + { + code: 'ERR_STREAM_DESTROYED', + type: Error, + message: 'Cannot call write after a stream was destroyed' + } + ); }));