Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Drain() infinite loop and add test for concurrent Next() calls #1525

Merged
merged 6 commits into from
Jan 15, 2024

Conversation

mdawar
Copy link
Contributor

@mdawar mdawar commented Jan 14, 2024

This is a possible fix for #1524.

Previously when calling Stop() we were checking for the done channel to be closed to exit the loop in Next(), but as stated in #1524 this didn't work for Drain() as it also closes the done channel, so now we check the msgs channel if closed to exit the loop, but this required some changes to how locking was done to prevent a deadlock, as Next() was holding the lock until it returns (which might take a long time), that prevented cleanup() from executing (waiting for the lock) to unsubscribe (where the msgs channel is closed).

Changes in this pull request:

  1. Added test for graceful shutdown for Drain()
  2. Added test for concurrent Next() calls with StopAfter option (auto unsubscribe)
  3. Removed the lock from cleanup() function
  4. Removed an unused drained channel from pullSubscription

Maybe a better solution would be to use multiple locks, for example one lock dedicated only for the subscription.

@piotrpio piotrpio self-requested a review January 14, 2024 21:02
Copy link
Collaborator

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good work here and again, thank you for the detailed issue and for the PR! There are some problems with your approach though in my opinion, as explained in the comment below - please let me know what you think.

@@ -537,69 +535,79 @@ var (
)

func (s *pullSubscription) Next() (Msg, error) {
s.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think removing the lock from the entire Next() execution is a good idea. For example, it can cause problems with StopAfter option where executing Next() concurrently may lead to delivering more messages to the caller than specified in StopAfter option.

Holding the lock for the duration of Next() is challenging, and you're absolutely right, we need to have a way to unlock it for cleanup - therefore, I believe catching tle closure of s.done is necessary. Based on your branch I came up with a different solution (it's a bit crude right now as I just wanted to give an example, this would have to be cleaned up a bit): https://github.com/nats-io/nats.go/blob/fix-drain-in-messages/jetstream/pull.go#L537

Here's the gist of it:

  1. When s.done is closed, we unlock the mutex, so that the subscription can be cleaned up properly and s.msgs can be closed. We need a way to conditionally unlock mutex in defer, thus the done bool (I really don't like that...)

  2. If we detect we are draining, we set done to true and continue. Next iterations of the loop will check for the state of done and if it's set, will go to a select statement which does not listen on s.done. Those 2 select statements are identical except whether or not we have case <-s.done.

I extracted handleIncomingMessage() and handleError() methods to make it a bit more readable, but now it's just copy-paste from the select.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, as you mentioned, we could use a separate lock just to make sure Next() cannot be executed concurrently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, I overlooked this StopAfter option, I think we should I add a test to verify this behavior first before we move on, the current test calls Next() sequentially.
After we add this test we should be able to refactor the code without breaking things.

There are also more elegant solutions:

  1. Using a dedicated lock for the subscription (Used in cleanup()) and another lock for the counter fields or maybe the rest of the fields
  2. Use atomic values for the counter fields

Which solution do you prefer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a separate lock for accessing subscription would be a preferable solution since I would be hesitant about allowing concurrent Next() calls - for concurrency, the suggested solution would be to create a whole new MessagesContext() for the same consumer. Separate lock sounds like it could actually simplify some things though, so that's nice.

Do you have time and would like to tackle this? Or should I take over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piotrpio yes you're right this is a better solution.
I started working on adding a test to verify Next() concurrent calls so any changes afterwards won't break this behavior.
I'll see what I can do, and you too feel free to do whatever is best.
I'll keep you updated with what I come up with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that in cleanup() there's no risk of race conditions, so it might be ok without holding the lock.
It only reads the subscription field which is set by methods that create the pullSubscription struct like pullConsumer.Consume, pullConsumer.Messages and pullConsumer.fetch and the actual Subscription has it's own mutex.

But I don't know if this acceptable, I mean for future changes to the code that might introduce race conditions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's ok right now (and looks like it is) and it does not produce a race, we can try to go without the lock I think - if in the future we will need locking mechanisms we can always add it. Just please (if you're working on it), add an appropriate comment on why the lock is not needed.

Thank you again for your contribution, it's extremely valuable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add the comment right now.

@mdawar mdawar changed the title Fix Drain() infinite loop and minimize lock scope in Next() Fix Drain() infinite loop and add test for concurrent Next() calls Jan 15, 2024
Copy link
Collaborator

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks great, LGTM!

@piotrpio piotrpio merged commit a8a8d18 into nats-io:main Jan 15, 2024
1 check passed
@mdawar mdawar deleted the drain-fix branch January 16, 2024 07:33
@piotrpio piotrpio mentioned this pull request Feb 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants