Skip to content

Commit

Permalink
fix(consumer): maintain ordering of offset commit requests (#2947)
Browse files Browse the repository at this point in the history
Hold broker lock to prevent concurrent calls to commit from potentially
being re-ordered between determining which offsets to include in the
offset commit request, and sending the request to Kafka.

Move finding the coordinator before creating the request to avoid
holding the lock in the case the coordinator is unknown. This requires a
change to the "new offset manager" test, which previously didn't expect
the mock broker to receive a find coordinator request.

The newly added test simulates multiple concurrent commits for a single group
running in a single process. The expected behavior is that commits are
delivered in order to the mock broker, with no offset in the
OffsetCommitRequest being for a lower value than previously committed
for the partition.

Fixes: #2940

Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
  • Loading branch information
prestona and mpburg committed Aug 7, 2024
1 parent d2246cc commit 6b4f9be
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
28 changes: 26 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,52 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

// Care needs to be taken to unlock this. Don't want to defer the unlock as this would
// cause the lock to be held while waiting for the broker to reply.
broker.lock.Lock()
req := om.constructRequest()
if req == nil {
broker.lock.Unlock()
return
}
resp, rp, err := sendOffsetCommit(broker, req)
broker.lock.Unlock()

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

resp, err := broker.CommitOffset(req)
err = handleResponsePromise(req, resp, rp, nil)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

broker.handleThrottledResponse(resp)
om.handleResponse(broker, req, resp)
}

func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) {
resp := new(OffsetCommitResponse)
responseHeaderVersion := resp.headerVersion()
promise, err := coordinator.send(req, true, responseHeaderVersion)
if err != nil {
return nil, nil, err
}
return resp, promise, nil
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
Expand Down
104 changes: 104 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -82,6 +84,9 @@ func TestNewOffsetManager(t *testing.T) {
metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse)
findCoordResponse := new(FindCoordinatorResponse)
findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
seedBroker.Returns(findCoordResponse)
defer seedBroker.Close()

testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
Expand All @@ -102,6 +107,105 @@ func TestNewOffsetManager(t *testing.T) {
}
}

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
lastOffset := map[int32]int64{}
var outOfOrder atomic.Pointer[string]
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"MetadataRequest": func(req *request) encoderWithHeader {
resp := new(MetadataResponse)
resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
return resp
},
"FindCoordinatorRequest": func(req *request) encoderWithHeader {
resp := new(FindCoordinatorResponse)
resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
return resp
},
"OffsetFetchRequest": func(r *request) encoderWithHeader {
req := r.body.(*OffsetFetchRequest)
resp := new(OffsetFetchResponse)
resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
for topic, partitions := range req.partitions {
for _, partition := range partitions {
if _, ok := resp.Blocks[topic]; !ok {
resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
}
resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
Offset: 0,
Err: ErrNoError,
}
}
}
return resp
},
"OffsetCommitRequest": func(r *request) encoderWithHeader {
req := r.body.(*OffsetCommitRequest)
if outOfOrder.Load() == nil {
for partition, offset := range req.blocks["topic"] {
last := lastOffset[partition]
if last > offset.offset {
msg := fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
outOfOrder.Store(&msg)
}
lastOffset[partition] = offset.offset
}
}

// Potentially yield, to try and avoid each Go routine running sequentially to completion
runtime.Gosched()

resp := new(OffsetCommitResponse)
resp.Errors = map[string]map[int32]KError{}
resp.Errors["topic"] = map[int32]KError{}
for partition := range req.blocks["topic"] {
resp.Errors["topic"][partition] = ErrNoError
}
return resp
},
})
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}
defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, om)

const numPartitions = 10
const commitsPerPartition = 1000

var wg sync.WaitGroup
for p := 0; p < numPartitions; p++ {
pom, err := om.ManagePartition("topic", int32(p))
if err != nil {
t.Fatal(err)
}

wg.Add(1)
go func() {
for c := 0; c < commitsPerPartition; c++ {
pom.MarkOffset(int64(c+1), "")
om.Commit()
}
wg.Done()
}()
}

wg.Wait()
errMsg := outOfOrder.Load()
if errMsg != nil {
t.Error(*errMsg)
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
Expand Down

0 comments on commit 6b4f9be

Please sign in to comment.