diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index aad43823f9afb9..4742391fd71a7a 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -63,15 +63,27 @@ function eos(stream, opts, callback) { const wState = stream._writableState; const rState = stream._readableState; + const state = wState || rState; const onlegacyfinish = () => { if (!stream.writable) onfinish(); }; + // TODO (ronag): Improve soft detection to include core modules and + // common ecosystem modules that do properly emit 'close' but fail + // this generic check. + const willEmitClose = ( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); + let writableFinished = stream.writableFinished || (wState && wState.finished); const onfinish = () => { writableFinished = true; + if (willEmitClose && (!stream.readable || readable)) return; if (!readable || readableEnded) callback.call(stream); }; @@ -79,6 +91,7 @@ function eos(stream, opts, callback) { (rState && rState.endEmitted); const onend = () => { readableEnded = true; + if (willEmitClose && (!stream.writable || writable)) return; if (!writable || writableFinished) callback.call(stream); }; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index c2b454e80b38af..b273fddfa3b613 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -7,7 +7,8 @@ const { Readable, Transform, pipeline, - PassThrough + PassThrough, + Duplex } = require('stream'); const assert = require('assert'); const http = require('http'); @@ -1077,3 +1078,43 @@ const { promisify } = require('util'); assert.ifError(err); })); } + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Writable({ + write(chunk, encoding, callback) { + callback(); + } + }); + src.on('close', () => { + closed = true; + }); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Duplex({}); + src.on('close', common.mustCall(() => { + closed = true; + })); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +}