From d41f123b4a51bb66e455c68d850467dadb129944 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 5 Jul 2016 08:59:42 -0400 Subject: [PATCH] Consumer: fix possible tight loop In certain cases the `subscriptionManager` and drain loop in `abort` could start spinning against each other for a non-trivial amount of time, if e.g. a partition was suspended waiting for the user to consume its buffer. The tight loop has always been there, it has just tended to be trivially short in normal operation so nobody noticed. This is the same issue that the `wait` channel was introduced to solve for the `subscriptionConsumer` so solve it the same way. --- consumer.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index f6c979766..c001794e1 100644 --- a/consumer.go +++ b/consumer.go @@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() { var buffer []*partitionConsumer // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer - // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks + // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, // so the main goroutine can block waiting for work if it has none. @@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) { child.trigger <- none{} } - for newSubscription := range bc.newSubscriptions { - for _, child := range newSubscription { + for newSubscriptions := range bc.newSubscriptions { + if len(newSubscriptions) == 0 { + <-bc.wait + continue + } + for _, child := range newSubscriptions { child.sendError(err) child.trigger <- none{} }