diff --git a/consumer.go b/consumer.go index be4e6d4a3..96226ac5b 100644 --- a/consumer.go +++ b/consumer.go @@ -310,6 +310,7 @@ type partitionConsumer struct { trigger, dying chan none responseResult error + closeOnce sync.Once fetchSize int32 offset int64 @@ -412,7 +413,9 @@ func (child *partitionConsumer) AsyncClose() { // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will // also just close itself) - close(child.dying) + child.closeOnce.Do(func() { + close(child.dying) + }) } func (child *partitionConsumer) Close() error {