-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(pubsub): Closes #10094 - memory leak in pubsub receive #10153
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
🤖 I detect that the PR title and the commit message differ and there's only one commit. To use the PR title for the commit history, you can use Github's automerge feature with squashing, or use -- conventional-commit-lint bot |
So the reason why this keeps failing continuous is because a few of the tests are brittle and expect |
Oh right. Yeah, up to you whether you think |
I missed a few checks in the recent PR, will be making another change for this. |
pubsub/iterator.go
Outdated
@@ -249,6 +252,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { | |||
} | |||
// Any error here is fatal. | |||
if err != nil { | |||
if status.Code(err) == codes.Canceled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I think we shouldn't change this. Changing this is technically a breaking change, even though it's fairly unlikely. Did you have a strong reason to add this? I couldn't tell from an initial pass how this is related to the memory leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it’s indeed just that it wasn’t clear to me whether the caller or the function had the responsibility of handling ‘cancelled’ error: I was under the impression, given that cancelling the provided context is the intended way to stop receiving messages, that Receive should then just return nil, but feel free to adjust/remove this as you see fit.
From what you were saying about unit tests failing, sounds like this change here is not enough for that anyway…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if I were to go back and redesign this, Receive
should return nil, but given that we have returned Canceled
up until now, we should leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, sorry, looked at it more closely now: I did introduce this change because without it the behaviour does also change, as previously recvMessages
would not return a cancellation error when terminating the Receive
call, while with the suggested change it would (and the error would propagate back).
I guess the point is my fix in the error check here is a bit too blunt as it introduces two other changes (presumably the source of failing tests):
- Seems like
pullMessages
in theit.po.synchronous==true
branch is expected to return cancellation errors in some situations? - The
recvMessages
call currently does return a cancellation error if it's the undelying client that gets closed (rather than just the context passed toReceive
), which gets tested inTestStreamingPull_ClosedClient
.
If you want to preserve this exact same behaviour, the fix would thus be
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -249,12 +249,12 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
rmsgs, err = it.pullMessages(maxToPull)
} else {
rmsgs, err = it.recvMessages()
+ if status.Code(err) == codes.Canceled && it.ps.ctx.Err() == context.Canceled {
+ err = io.EOF
+ }
}
// Any error here is fatal.
if err != nil {
- if status.Code(err) == codes.Canceled {
- err = io.EOF
- }
return nil, it.fail(err)
}
recordStat(it.ctx, PullCount, int64(len(rmsgs)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically synchronous mode is deprecated, but yes let's avoid making breaking changes there regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, well, I've pushed the change above, along with a comment. Let me know if you're happy with it!
Hopefully I'm not overstepping: opened #10186 to address the failure in CI here... (finally figured out where to get the test logs! hahah) |
Thanks! I'll leave it for the relevant folks to review since I don't own those parts of this repository. With that said, there was a bug that was causing our presubmits to run non-pubsub related tests, which was just fixed in #10185. That shouldn't be a blocker for this PR now |
Hmm, the latest test failure ( Let me know your preferred fix for this: should I add this specific check, i.e. |
I think the latter is better. Going from returning an error to |
Oh, thank you for your patience with me overlooking all these things... Just glad we sort everything out before it gets merged and shipped! Hmm, not sure I follow your argument here, sorry. In fact after writing the above I had second thoughts about that third option: wouldn't the caller of |
Hm fair point. We should check for the |
Yes, agreed: this seems reasonable enough. Pushing the (hopefully) final fix... |
See discussion in #10094.