diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4d42e5cd6b660a..3b58c7b9ee232b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -336,6 +336,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.length += len; + // stream._write resets state.length + const ret = state.length < state.highWaterMark; + // We must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; + if (state.writing || state.corked || state.errored) { state.buffered.push({ chunk, encoding, callback }); if (state.allBuffers && encoding !== 'buffer') { @@ -353,12 +359,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.sync = false; } - const ret = state.length < state.highWaterMark; - - // We must ensure that previous needDrain will not be reset to false. - if (!ret) - state.needDrain = true; - // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; diff --git a/test/parallel/test-stream-duplex-readable-end.js b/test/parallel/test-stream-duplex-readable-end.js new file mode 100644 index 00000000000000..ca3ccf63c49474 --- /dev/null +++ b/test/parallel/test-stream-duplex-readable-end.js @@ -0,0 +1,32 @@ +'use strict'; +// https://github.com/nodejs/node/issues/35926 +require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +let loops = 5; + +const src = new stream.Readable({ + read() { + if (loops--) + this.push(Buffer.alloc(20000)); + } +}); + +const dst = new stream.Transform({ + transform(chunk, output, fn) { + this.push(null); + fn(); + } +}); + +src.pipe(dst); + +function parser_end() { + assert.ok(loops > 0); + dst.removeAllListeners(); +} + +dst.on('data', () => { }); +dst.on('end', parser_end); +dst.on('error', parser_end);