From f0f5b0ae1ae85915787dfc8fd196eeb874f7032d Mon Sep 17 00:00:00 2001 From: Nimi Wariboko Jr Date: Mon, 22 Aug 2016 21:07:59 -0700 Subject: [PATCH] Consumer: fix deadlock - if the timer had already expired, don't try to read the channel again. --- consumer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 5c7f90932..ff7318afd 100644 --- a/consumer.go +++ b/consumer.go @@ -414,21 +414,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime) + expireTimedOut := false feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { - if !expiryTimer.Stop() { + if !expiryTimer.Stop() && !expireTimedOut { // expiryTimer was expired; clear out the waiting msg <-expiryTimer.C } expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + expireTimedOut = false select { case child.messages <- msg: case <-expiryTimer.C: + expireTimedOut = true child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] {