diff --git a/offset_manager.go b/offset_manager.go index 15ddecbc8..ebfd8b403 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -176,7 +176,7 @@ type partitionOffsetManager struct { offset int64 metadata string dirty bool - clean chan none + clean sync.Cond broker *brokerOffsetManager errors chan *ConsumerError @@ -189,11 +189,11 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32 parent: om, topic: topic, partition: partition, - clean: make(chan none), errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), rebalance: make(chan none, 1), dying: make(chan none), } + pom.clean.L = &pom.lock if err := pom.selectBroker(); err != nil { return nil, err @@ -331,11 +331,7 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string if pom.offset == offset && pom.metadata == metadata { pom.dirty = false - - select { - case pom.clean <- none{}: - default: - } + pom.clean.Signal() } } @@ -353,11 +349,10 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) { func (pom *partitionOffsetManager) AsyncClose() { go func() { pom.lock.Lock() - dirty := pom.dirty - pom.lock.Unlock() + defer pom.lock.Unlock() - if dirty { - <-pom.clean + for pom.dirty { + pom.clean.Wait() } close(pom.dying)