Skip to content

Commit

Permalink
Consumer: fix possible tight loop
Browse files Browse the repository at this point in the history
In certain cases the `subscriptionManager` and drain loop in `abort` could start
spinning against each other for a non-trivial amount of time, if e.g. a
partition was suspended waiting for the user to consume its buffer. The tight
loop has always been there, it has just tended to be trivially short in normal
operation so nobody noticed.

This is the same issue that the `wait` channel was introduced to solve for the
`subscriptionConsumer` so solve it the same way.
  • Loading branch information
eapache committed Jul 5, 2016
1 parent 1a3d124 commit d41f123
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer

// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
Expand Down Expand Up @@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) {
child.trigger <- none{}
}

for newSubscription := range bc.newSubscriptions {
for _, child := range newSubscription {
for newSubscriptions := range bc.newSubscriptions {
if len(newSubscriptions) == 0 {
<-bc.wait
continue
}
for _, child := range newSubscriptions {
child.sendError(err)
child.trigger <- none{}
}
Expand Down

0 comments on commit d41f123

Please sign in to comment.