Skip to content

Commit

Permalink
Merge pull request #658 from Shopify/offset-manager-shutdown-take-2
Browse files Browse the repository at this point in the history
Fix race condition on OffsetManager shutdown
  • Loading branch information
eapache committed May 10, 2016
2 parents 89bd629 + 53285c9 commit 2acd68e
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type partitionOffsetManager struct {
offset int64
metadata string
dirty bool
clean chan none
clean sync.Cond
broker *brokerOffsetManager

errors chan *ConsumerError
Expand All @@ -189,11 +189,11 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32
parent: om,
topic: topic,
partition: partition,
clean: make(chan none),
errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
rebalance: make(chan none, 1),
dying: make(chan none),
}
pom.clean.L = &pom.lock

if err := pom.selectBroker(); err != nil {
return nil, err
Expand Down Expand Up @@ -331,11 +331,7 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string

if pom.offset == offset && pom.metadata == metadata {
pom.dirty = false

select {
case pom.clean <- none{}:
default:
}
pom.clean.Signal()
}
}

Expand All @@ -353,11 +349,10 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
func (pom *partitionOffsetManager) AsyncClose() {
go func() {
pom.lock.Lock()
dirty := pom.dirty
pom.lock.Unlock()
defer pom.lock.Unlock()

if dirty {
<-pom.clean
for pom.dirty {
pom.clean.Wait()
}

close(pom.dying)
Expand Down

0 comments on commit 2acd68e

Please sign in to comment.