diff --git a/consumer.go b/consumer.go index f6c979766..c001794e1 100644 --- a/consumer.go +++ b/consumer.go @@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() { var buffer []*partitionConsumer // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer - // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks + // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, // so the main goroutine can block waiting for work if it has none. @@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) { child.trigger <- none{} } - for newSubscription := range bc.newSubscriptions { - for _, child := range newSubscription { + for newSubscriptions := range bc.newSubscriptions { + if len(newSubscriptions) == 0 { + <-bc.wait + continue + } + for _, child := range newSubscriptions { child.sendError(err) child.trigger <- none{} }