Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we always have called Add() on the inflight counter before we Wait() for it #450

Merged
merged 1 commit into from
May 18, 2015

Conversation

wvanbergen
Copy link
Contributor

@eapache This is not the cause of #449, but fixes another a potential issue when using the waitgroup.

@@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() {

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary? If you create a wait group and then immediately Wait on it (without ever calling Add) it just returns right away...

@wvanbergen
Copy link
Contributor Author

I think, this could potentially be an issue:

p := NewAsyncProducer(...)
p.Input() <- msg
p.Close()

if the Close call gets handled before the goroutine has read the first message from the input channel, the inflight counter is still 0.

@eapache
Copy link
Contributor

eapache commented May 14, 2015

Oh interesting, you're right that's a legit issue. However, I'm not sure your patch fixes all the cases :)

p := NewAsyncProducer(...)
p.Close()
p.Input() <- msg

With your patch, if the input message is handled before the waiting goroutine wakes up, then we will increment the counter from 0 before immediately decrementing it... so I think we must stop doing the "increment+decrement" trick for messages we return with the shutting-down error as well.

@wvanbergen
Copy link
Contributor Author

I don't really understand this one? Close() doesn't return until the Wait() is finished?

@eapache
Copy link
Contributor

eapache commented May 15, 2015

  1. Close() calls Wait()
  2. Go switches contexts
  3. the message is sent on Input(), is received, causes the wait counter to increment from 0 (the problem case) and decrement again since it is immediately returned as an error

@wvanbergen
Copy link
Contributor Author

(I assume you means AsyncClose(), otherwise message on the input channel can never happen until the Wait() is finished.)

I don't really see what the problem is: after calling AsyncClose(), no message will ever make it into the processing logic, and we always wait for all the messages being processed to be completed. The worst that can happen is that we wait a bit too long, for msg to be returned as error?

@eapache
Copy link
Contributor

eapache commented May 15, 2015

I assume you means AsyncClose()

Ya, sorry

no message will ever make it into the processing logic

But it will still cause the wait counter to be Add(1)ed to, which is the bug

@eapache
Copy link
Contributor

eapache commented May 18, 2015

OK, this is an improvement regardless so I'll merge it; I'll also make a PR which should (I hope) fix and demonstrate the other case I'm thinking of.

eapache added a commit that referenced this pull request May 18, 2015
Ensure we always have called Add() on the inflight counter before we Wait() for it
@eapache eapache merged commit 6468e16 into master May 18, 2015
@eapache eapache deleted the producer_cleanup branch May 18, 2015 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants