-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream.pipeline does not wait for the last stream to flush before calling the final callback #34274
Comments
Hi, Aravindan. Thanks for your feedback, I am trying to locate this problem, and I reproduced this problem in v14.5.0. |
@rickyes Just checked, I too get this in both 12.8.0 and 14.5.0. Let me know if you need anything else from me. |
are you able to simplify the repro case? |
@ronag here you go: const stream = require('stream');
const makeStream = (i) =>
new stream.Transform({
transform: (chunk, enc, cb) => cb(null, chunk),
flush: (cb) =>
setTimeout(() => {
console.log('done flushing', i);
cb(null);
}),
});
const input = new stream.Readable();
stream.pipeline(
input,
makeStream(1),
makeStream(2),
makeStream(3),
() => console.log('done!'),
);
input.push('test');
input.push(null); Prints: done flushing 1
done flushing 2
done!
done flushing 3 |
I think I'm getting close to the problem and check again tonight. |
Simplest repro: const stream = require('stream');
const makeStream = (i) =>
new stream.Transform({
transform: (chunk, enc, cb) => cb(null, chunk),
flush: (cb) =>
setTimeout(() => {
console.log('done flushing', i);
cb(null);
}),
});
const input = new stream.Readable();
input.push(null);
stream.pipeline(
input,
makeStream(1),
() => console.log('done!'),
); |
Due to compat reasons Transform streams don't always wait for flush to complete before finishing the stream. Try to wait when possible, i.e. when the user does not override _final. Fixes: nodejs#34274
I think I found a solution. |
Due to compat reasons Transform streams don't always wait for flush to complete before finishing the stream. Try to wait when possible, i.e. when the user does not override _final. Fixes: #34274 PR-URL: #34314 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Zeyu Yang <himself65@outlook.com>
I just reproduced this issue in Node.js 14.15.4 but I see it's fixed in 15.5.1 |
The simplest workaround seems to declare a promise somewhere outside, assign a value in |
* chore(stream-transform-from): create directory for `@sounisi5011/stream-transform-from` package * test(stream-transform-from): add tests * test(stream-transform-from): remove the `createPipeline()` function from test code * feat(stream-transform-from): add a type definition for the public API * feat(stream-transform-from): add `transformFrom()` function and `TransformFromAsyncIterable` class * fix(stream-transform-from): change the complex `createSource()` method to a simple `createSource()` function * test(stream-transform-from): fix the tests + Re-create the `Transform` object every time testing. + Simplify tests that differ only in options with `describe.each()`. * refactor(stream-transform-from): use the `catch()` method instead of the `try ... catch` statement * fix(stream-transform-from): don't call the callback multiple times & call the callback passed to `_flush()` method after finish * test(stream-transform-from): use the `break` keyword while transforming a stream * fix(stream-transform-from): use the `break` keyword while transforming a stream * refactor(stream-transform-from): refactoring the `TransformFromAsyncIterable` class + Rename the `done` property to `transformCallback + Rename the `callDoneFn` method to `callTransformCallback` + Remove the `pushError` method and add `finish` method in its place * test(stream-transform-from): add tests + Added a test that returns multiple chunks + Added a test for the type of chunks contained in the source * fix(stream-transform-from): fix the type definition of return values If there is a possibility that either the `objectMode` or the `readableObjectMode` option is not `true`, the return value should not be `unknown`. * test(stream-transform-from): add a test to merge multiple chunks * fix(stream-transform-from): `transformCallback` should be called when the next chunk is needed, not after transforming a chunk * test(stream-transform-from): add type definition tests * fix(stream-transform-from): fix `InputChunkType` Index signatures and `boolean` types are now supported. * test(stream-transform-from): add type definition tests for output values * docs(stream-transform-from): add example code * test(stream-transform-from): add a test for the timing of data flowing in the stream * test(stream-transform-from): fix the timing test for data flowing in the stream I noticed that the timing of the flow changes when using the asynchronous API. * test(stream-transform-from): eliminate the PromiseRejectionHandledWarning Promise objects should only be created in tests. Creating it during test case generation could cause Promise errors to affect unrelated code. * test(stream-transform-from): fix the timing test for data flowing in the stream We have not found a way to control when data is read from the Readable stream. * fix(stream-transform-from): create a source iterator in the constructor * refactor(stream-transform-from): remove `src/utils.ts` * refactor(stream-transform-from): reduce Cognitive Complexity in the `TransformFromAsyncIterable#finish()` method Code Climate reported: + Function `finish` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring. * test(stream-transform-from): add tests that inherit from the `TransformFromAsyncIterable` class * revert: test(stream-transform-from): add tests that inherit from the `TransformFromAsyncIterable` class This reverts commit 28e9901. * test(stream-transform-from): add tests to convert strings of different encodings * feat(stream-transform-from): can use the encoding passed with the chunk * feat(stream-transform-from): export some types * test(stream-transform-from): can't get the error after processing all chunks see nodejs/node#34274 * test(stream-transform-from): the bug with not being able to get errors after processing all chunks has been fixed in Node v15 * test(stream-transform-from): fix the bug that no error occurs after getting all the chunks This bug has been fixed in Node.js v15. + https://github.com/nodejs/node/blob/master/doc/changelogs/CHANGELOG_V15.md#15.0.0 + nodejs/node#34314 However, we have found a workaround that works for anything less than Node.js v15, so we will attempt to fix this bug in this package. * fix(stream-transform-from): fix the bug that no error occurs after getting all the chunks * test(stream-transform-from): make the conditional branches of the test code readable and organized * test(stream-transform-from): refactoring test code * build(stream-transform-from): disable the inlineSources option in TypeScript The size of the package is now smaller if `src` directory contents are included. * docs(stream-transform-from): add `README.md` * ci(stream-transform-from): add custom publish scripts * docs(stream-transform-from): update `README.md` * test(stream-transform-from): add tests for options that should be ignored * fix(stream-transform-from): ignore some options * fix(stream-transform-from): add "construct" in the options to ignore * docs(stream-transform-from): update `README.md`: add fields that are disallowed in options * refactor(stream-transform-from): don't use the global object `process` * docs(stream-transform-from): fix the code for the example in `README.md` * refactor(stream-transform-from): move utility type functions to the top of file * style(stream-transform-from): update `src/index.ts`
Thanks for reading this, I know it's a little long but I believe it is relevant. Bear with me 🙏 I implemented So like many before me I wrapped it in a Except: once the AWS SDK reads the last byte from that passthrough stream, we're "done" as far as "Aha!" I said. "I'll implement But... it turns out that tons of operations are still going on even after the pipelines are all 100% convinced the are finished. I couldn't figure it out until I found this thread. The fix here in Node.js specifically decides not to wait for my Augh! Is there just no way to impose an additional requirement upon finalization in a PassThrough stream and have that be respected by pipeline()? Will I have to do something like returning both a stream and a promise, and awaiting the pipeline and then the promise? Or perhaps implementing a writable stream from scratch, whatever that looks like in 2023? I did try implementing Here is my current implementation. Note that Thanks so much for reading!
|
What steps will reproduce the bug?
How often does it reproduce? Is there a required condition?
always
What is the expected behavior?
Console output should look like:
What do you see instead?
Console output actually looks like:
Additional information
As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.
The text was updated successfully, but these errors were encountered: