Skip to content

Commit

Permalink
Ensure ordering of offset commit requests
Browse files Browse the repository at this point in the history
Add locking to prevent concurrent calls to commit from potentially being
re-ordered between determining which offsets to include in the offset
commit request, and sending the request to Kafka.

Fixes: IBM#2940
Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
  • Loading branch information
prestona and mpburg committed Jul 19, 2024
1 parent d2246cc commit e675806
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type offsetManager struct {
poms map[string]map[int32]*partitionOffsetManager
pomsLock sync.RWMutex

commitLock sync.Mutex

closeOnce sync.Once
closing chan none
closed chan none
Expand Down Expand Up @@ -251,17 +253,19 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
}

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

om.commitLock.Lock()
defer om.commitLock.Unlock()
req := om.constructRequest()
if req == nil {
return
}

resp, err := broker.CommitOffset(req)
if err != nil {
om.handleError(err)
Expand Down

0 comments on commit e675806

Please sign in to comment.