From 6b4f9becca8bc225942ba863f6f1187b07dbac82 Mon Sep 17 00:00:00 2001 From: Adrian Preston Date: Wed, 7 Aug 2024 11:17:22 +0100 Subject: [PATCH] fix(consumer): maintain ordering of offset commit requests (#2947) Hold broker lock to prevent concurrent calls to commit from potentially being re-ordered between determining which offsets to include in the offset commit request, and sending the request to Kafka. Move finding the coordinator before creating the request to avoid holding the lock in the case the coordinator is unknown. This requires a change to the "new offset manager" test, which previously didn't expect the mock broker to receive a find coordinator request. The newly added test simulates multiple concurrent commits for a single group running in a single process. The expected behavior is that commits are delivered in order to the mock broker, with no offset in the OffsetCommitRequest being for a lower value than previously committed for the partition. Fixes: #2940 Co-authored-by: Michael Burgess Signed-off-by: Adrian Preston --- offset_manager.go | 28 ++++++++++- offset_manager_test.go | 104 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 2 deletions(-) diff --git a/offset_manager.go b/offset_manager.go index 1bf545908..294865127 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -251,18 +251,31 @@ func (om *offsetManager) Commit() { } func (om *offsetManager) flushToBroker() { + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + // Care needs to be taken to unlock this. Don't want to defer the unlock as this would + // cause the lock to be held while waiting for the broker to reply. + broker.lock.Lock() req := om.constructRequest() if req == nil { + broker.lock.Unlock() return } + resp, rp, err := sendOffsetCommit(broker, req) + broker.lock.Unlock() - broker, err := om.coordinator() if err != nil { om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() return } - resp, err := broker.CommitOffset(req) + err = handleResponsePromise(req, resp, rp, nil) if err != nil { om.handleError(err) om.releaseCoordinator(broker) @@ -270,9 +283,20 @@ func (om *offsetManager) flushToBroker() { return } + broker.handleThrottledResponse(resp) om.handleResponse(broker, req, resp) } +func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) { + resp := new(OffsetCommitResponse) + responseHeaderVersion := resp.headerVersion() + promise, err := coordinator.send(req, true, responseHeaderVersion) + if err != nil { + return nil, nil, err + } + return resp, promise, nil +} + func (om *offsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ Version: 1, diff --git a/offset_manager_test.go b/offset_manager_test.go index 04322fce7..971ed3a42 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -3,6 +3,8 @@ package sarama import ( "errors" "fmt" + "runtime" + "sync" "sync/atomic" "testing" "time" @@ -82,6 +84,9 @@ func TestNewOffsetManager(t *testing.T) { metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) + findCoordResponse := new(FindCoordinatorResponse) + findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + seedBroker.Returns(findCoordResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) @@ -102,6 +107,105 @@ func TestNewOffsetManager(t *testing.T) { } } +// Test that the correct sequence of offset commit messages is sent to a broker when +// multiple goroutines for a group are committing offsets at the same time +func TestOffsetManagerCommitSequence(t *testing.T) { + lastOffset := map[int32]int64{} + var outOfOrder atomic.Pointer[string] + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "MetadataRequest": func(req *request) encoderWithHeader { + resp := new(MetadataResponse) + resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + return resp + }, + "FindCoordinatorRequest": func(req *request) encoderWithHeader { + resp := new(FindCoordinatorResponse) + resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + return resp + }, + "OffsetFetchRequest": func(r *request) encoderWithHeader { + req := r.body.(*OffsetFetchRequest) + resp := new(OffsetFetchResponse) + resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{} + for topic, partitions := range req.partitions { + for _, partition := range partitions { + if _, ok := resp.Blocks[topic]; !ok { + resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{} + } + resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{ + Offset: 0, + Err: ErrNoError, + } + } + } + return resp + }, + "OffsetCommitRequest": func(r *request) encoderWithHeader { + req := r.body.(*OffsetCommitRequest) + if outOfOrder.Load() == nil { + for partition, offset := range req.blocks["topic"] { + last := lastOffset[partition] + if last > offset.offset { + msg := fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d", + partition, last, offset.offset) + outOfOrder.Store(&msg) + } + lastOffset[partition] = offset.offset + } + } + + // Potentially yield, to try and avoid each Go routine running sequentially to completion + runtime.Gosched() + + resp := new(OffsetCommitResponse) + resp.Errors = map[string]map[int32]KError{} + resp.Errors["topic"] = map[int32]KError{} + for partition := range req.blocks["topic"] { + resp.Errors["topic"][partition] = ErrNoError + } + return resp + }, + }) + testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, testClient) + om, err := NewOffsetManagerFromClient("group", testClient) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, om) + + const numPartitions = 10 + const commitsPerPartition = 1000 + + var wg sync.WaitGroup + for p := 0; p < numPartitions; p++ { + pom, err := om.ManagePartition("topic", int32(p)) + if err != nil { + t.Fatal(err) + } + + wg.Add(1) + go func() { + for c := 0; c < commitsPerPartition; c++ { + pom.MarkOffset(int64(c+1), "") + om.Commit() + } + wg.Done() + }() + } + + wg.Wait() + errMsg := outOfOrder.Load() + if errMsg != nil { + t.Error(*errMsg) + } +} + var offsetsautocommitTestTable = []struct { name string set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable