diff --git a/consumer.go b/consumer.go index 5c7f90932..ff7318afd 100644 --- a/consumer.go +++ b/consumer.go @@ -414,21 +414,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime) + expireTimedOut := false feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { - if !expiryTimer.Stop() { + if !expiryTimer.Stop() && !expireTimedOut { // expiryTimer was expired; clear out the waiting msg <-expiryTimer.C } expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + expireTimedOut = false select { case child.messages <- msg: case <-expiryTimer.C: + expireTimedOut = true child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] {