-
Notifications
You must be signed in to change notification settings - Fork 29.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: pipeline wait for close before calling the callback #53462
stream: pipeline wait for close before calling the callback #53462
Conversation
Review requested:
|
13d24b0
to
de856a4
Compare
This is different from what I was picturing, and I wonder if it works with a pipeline that has some number of The problem as I see it is that over time, various new places have been added that call diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index bb34759..83c53d8 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(err, --finishCount === 0);
}
+ function finishOnlyHandleError(err) {
+ finishImpl(err, false);
+ }
+
function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
@@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
- finish(err);
+ finishOnlyHandleError(err);
}
}
stream.on('error', onError);
@@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
- const cleanup = pipe(ret, stream, finish, { end });
+ const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
@@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}
-function pipe(src, dst, finish, { end }) {
+function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
let ended = false;
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
- finish(new ERR_STREAM_PREMATURE_CLOSE());
+ finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
}
}); (not tested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@wh0 Thanks for spending time and looking into it!
That was a great catch! It WON'T work when I add one or more
I see your point and I agree, overtime it would become harder to work with. And I have also tested it your patch and it works just like expected. I will temporarily mark this PR as draft and take some more time to look into your patch and add additional tests to cover the case where there one or more @mcollina Thanks for looking into it, will bug you for review again once its ready 😄 🙏 |
The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: nodejs#51540 Co-authored-by: wh0 <wh0@users.noreply.github.com>
de856a4
to
0154e63
Compare
I have added another new test to include a number of @wh0 your fix was clean and way better than my original "monkey patch" fix (there was no issue with your patch, my previous message saying there was a little bit issue was just me waking up without coffee 😄) absolutely learned a lot. I have added you as a co-author please let me know if you are happy with me using your patch. |
I would like to properly review this. Please don't land yet. |
thanks jakecastelli, I'm happy to have this patch used/adapted. please accept it under the same license as the rest of the nodejs project. glad to know this approach works even with the PassThroughs. ronag thanks for your time as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will break legacy user land streams that don't always emit close. Pipeline does not guarantee that close is emitted on all streams before the callback is invoked. The current behavior is intentional. If the docs says otherwise I believe we should fix those instead.
This was mentioned in the comments for the issue: #51540
I appreciate the effort and sorry for the late feedback.
Thanks @ronag! This makes sense, when callback is not called in the Even though the current behaviour feels slightly weird as if there is any async operation in the Thanks again for everyone's time on this, I will close this PR soon 🙏 |
what kind of break is this referring to? e.g. never calling the callback? |
the patch doesn't depend on |
I will take another look. |
@ronag I think that case is handled. |
I think this behavior changed last between 17.2 and 17.3 |
@jakecastelli why is it breaking? @wh0 have you got a reference? |
I've added a "backing-for-lts" label, so we don't backport to v20 for a bit. |
Unless we have an example of something that regressed, I think this can land. |
I don't think is breaking. |
I tested it on 16.x, 17.2, 17.3, 17.x didn't see the behaviour change though
With this patch change - when the callback is not called in the const stream = require("stream");
const src = new stream.Readable();
const dst = new stream.Writable({
write(chunk, encoding, callback) {
callback();
},
destroy(error, callback) {
// takes a while to destroy
setImmediate(() => {
// not calling callback here will result callback not be called in the stream.pipeline
// callback(error);
});
},
});
stream.pipeline(src, dst, (err) => {
console.log(`pipeline done, err=${err.message}, dst.closed=${dst.closed}`);
});
src.destroy(new Error("problem")); |
You always have to call the callback. If you don't it's broken in so many way anyway that I wouldn't consider it a breaking change. |
Ah I see, thanks for confirming this! off topic - I just thought give a mention of this obligatory meme 😄 |
I think we are ready to land @mcollina 👍 |
Landed in 8e5d88b |
The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: #51540 Co-authored-by: wh0 <wh0@users.noreply.github.com> PR-URL: #53462 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 module: * add __esModule to require()'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 worker: * (SEMVER-MINOR) add postMessageToThread (Paolo Insogna) #53682 PR-URL: TODO
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 module: * add __esModule to require()'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 worker: * (SEMVER-MINOR) add postMessageToThread (Paolo Insogna) #53682 PR-URL: #53826
The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: #51540 Co-authored-by: wh0 <wh0@users.noreply.github.com> PR-URL: #53462 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) #53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 test_runner: * support glob matching coverage files (Aviv Keller) #53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) #53682 PR-URL: #53826
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) #53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 test_runner: * support glob matching coverage files (Aviv Keller) #53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) #53682 PR-URL: #53826
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) #53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 test_runner: * support glob matching coverage files (Aviv Keller) #53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) #53682 PR-URL: #53826
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) #53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 test_runner: * support glob matching coverage files (Aviv Keller) #53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) #53682 PR-URL: #53826
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) #53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) #53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) #52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) #52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) #53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) #53462 test_runner: * support glob matching coverage files (Aviv Keller) #53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) #53682 PR-URL: #53826
Any chance this is related to npm/cli#7657 ? |
Hi @ljharb based on the discussion in 53904 and a few others I think it is not related to this one. I left my findings in npm/cli#7657 (comment) about |
Notable changes: http: * (SEMVER-MINOR) expose websockets (Natalia Venditto) nodejs#53721 lib: * (SEMVER-MINOR) add `node:sqlite` module (Colin Ihrig) nodejs#53752 module: * add `__esModule` to `require()`'d ESM (Joyee Cheung) nodejs#52166 path: * (SEMVER-MINOR) add `matchesGlob` method (Aviv Keller) nodejs#52881 process: * (SEMVER-MINOR) port on-exit-leak-free to core (Vinicius Lourenço) nodejs#53239 stream: * (SEMVER-MINOR) pipeline wait for close before calling the callback (jakecastelli) nodejs#53462 test_runner: * support glob matching coverage files (Aviv Keller) nodejs#53553 worker: * (SEMVER-MINOR) add `postMessageToThread` (Paolo Insogna) nodejs#53682 PR-URL: nodejs#53826
pipeline doc: here
The pipeline should wait for close event to finish before calling the callback as that should be the proper "fully done" state.
The
finishCount
should not below 0 when callingfinishImpl
function (should not be negative).Fixes: #51540