diff --git a/offset_manager.go b/offset_manager.go index 1bf545908..294865127 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -251,18 +251,31 @@ func (om *offsetManager) Commit() { } func (om *offsetManager) flushToBroker() { + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + // Care needs to be taken to unlock this. Don't want to defer the unlock as this would + // cause the lock to be held while waiting for the broker to reply. + broker.lock.Lock() req := om.constructRequest() if req == nil { + broker.lock.Unlock() return } + resp, rp, err := sendOffsetCommit(broker, req) + broker.lock.Unlock() - broker, err := om.coordinator() if err != nil { om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() return } - resp, err := broker.CommitOffset(req) + err = handleResponsePromise(req, resp, rp, nil) if err != nil { om.handleError(err) om.releaseCoordinator(broker) @@ -270,9 +283,20 @@ func (om *offsetManager) flushToBroker() { return } + broker.handleThrottledResponse(resp) om.handleResponse(broker, req, resp) } +func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) { + resp := new(OffsetCommitResponse) + responseHeaderVersion := resp.headerVersion() + promise, err := coordinator.send(req, true, responseHeaderVersion) + if err != nil { + return nil, nil, err + } + return resp, promise, nil +} + func (om *offsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ Version: 1, diff --git a/offset_manager_test.go b/offset_manager_test.go index 04322fce7..971ed3a42 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -3,6 +3,8 @@ package sarama import ( "errors" "fmt" + "runtime" + "sync" "sync/atomic" "testing" "time" @@ -82,6 +84,9 @@ func TestNewOffsetManager(t *testing.T) { metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) + findCoordResponse := new(FindCoordinatorResponse) + findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + seedBroker.Returns(findCoordResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) @@ -102,6 +107,105 @@ func TestNewOffsetManager(t *testing.T) { } } +// Test that the correct sequence of offset commit messages is sent to a broker when +// multiple goroutines for a group are committing offsets at the same time +func TestOffsetManagerCommitSequence(t *testing.T) { + lastOffset := map[int32]int64{} + var outOfOrder atomic.Pointer[string] + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "MetadataRequest": func(req *request) encoderWithHeader { + resp := new(MetadataResponse) + resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + return resp + }, + "FindCoordinatorRequest": func(req *request) encoderWithHeader { + resp := new(FindCoordinatorResponse) + resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + return resp + }, + "OffsetFetchRequest": func(r *request) encoderWithHeader { + req := r.body.(*OffsetFetchRequest) + resp := new(OffsetFetchResponse) + resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{} + for topic, partitions := range req.partitions { + for _, partition := range partitions { + if _, ok := resp.Blocks[topic]; !ok { + resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{} + } + resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{ + Offset: 0, + Err: ErrNoError, + } + } + } + return resp + }, + "OffsetCommitRequest": func(r *request) encoderWithHeader { + req := r.body.(*OffsetCommitRequest) + if outOfOrder.Load() == nil { + for partition, offset := range req.blocks["topic"] { + last := lastOffset[partition] + if last > offset.offset { + msg := fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d", + partition, last, offset.offset) + outOfOrder.Store(&msg) + } + lastOffset[partition] = offset.offset + } + } + + // Potentially yield, to try and avoid each Go routine running sequentially to completion + runtime.Gosched() + + resp := new(OffsetCommitResponse) + resp.Errors = map[string]map[int32]KError{} + resp.Errors["topic"] = map[int32]KError{} + for partition := range req.blocks["topic"] { + resp.Errors["topic"][partition] = ErrNoError + } + return resp + }, + }) + testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, testClient) + om, err := NewOffsetManagerFromClient("group", testClient) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, om) + + const numPartitions = 10 + const commitsPerPartition = 1000 + + var wg sync.WaitGroup + for p := 0; p < numPartitions; p++ { + pom, err := om.ManagePartition("topic", int32(p)) + if err != nil { + t.Fatal(err) + } + + wg.Add(1) + go func() { + for c := 0; c < commitsPerPartition; c++ { + pom.MarkOffset(int64(c+1), "") + om.Commit() + } + wg.Done() + }() + } + + wg.Wait() + errMsg := outOfOrder.Load() + if errMsg != nil { + t.Error(*errMsg) + } +} + var offsetsautocommitTestTable = []struct { name string set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable