From 0f07abc80d7e1e86de28ea13d939054cebd69e7c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 11 Jul 2022 12:25:05 +0200 Subject: [PATCH] stream: finish pipeline if dst closes before src If the destination stream is closed before the source has completed the pipeline should finnish with premature close. Fixes: https://github.com/nodejs/node/issues/43682 PR-URL: https://github.com/nodejs/node/pull/43701 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca --- lib/internal/streams/pipeline.js | 14 ++++++++++++- test/parallel/test-stream-pipeline-duplex.js | 21 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-pipeline-duplex.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index a3f69c2099ace1..77520a14d50a6f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -20,6 +20,7 @@ const { ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE, }, AbortError, } = require('internal/errors'); @@ -344,13 +345,24 @@ function pipelineImpl(streams, callback, opts) { } function pipe(src, dst, finish, { end }) { + let ended = false; + dst.on('close', () => { + if (!ended) { + // Finish if the destination closes before the source has completed. + finish(new ERR_STREAM_PREMATURE_CLOSE()); + } + }); + src.pipe(dst, { end }); if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => dst.end()); + src.once('end', () => { + ended = true; + dst.end(); + }); } else { finish(); } diff --git a/test/parallel/test-stream-pipeline-duplex.js b/test/parallel/test-stream-pipeline-duplex.js new file mode 100644 index 00000000000000..0dbd27a7174dc9 --- /dev/null +++ b/test/parallel/test-stream-pipeline-duplex.js @@ -0,0 +1,21 @@ +'use strict'; + +const common = require('../common'); +const { pipeline, Duplex, PassThrough } = require('stream'); +const assert = require('assert'); + +const remote = new PassThrough(); +const local = new Duplex({ + read() {}, + write(chunk, enc, callback) { + callback(); + } +}); + +pipeline(remote, local, remote, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); +})); + +setImmediate(() => { + remote.end(); +});