Skip to content

Commit

Permalink
A little refactoring and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie committed Jan 2, 2024
1 parent c6f2a1a commit be69aea
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 12 deletions.
11 changes: 5 additions & 6 deletions common/messaging/kafka/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (h *consumerHandlerImpl) getCurrentSession() sarama.ConsumerGroupSession {
return h.currentSession
}

func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) {
func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) error {
h.RLock()
defer h.RUnlock()

Expand Down Expand Up @@ -215,9 +215,10 @@ func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool)
h.logger.Error("Failed to complete an message that hasn't been added to the partition",
tag.KafkaPartition(message.Partition()),
tag.KafkaOffset(message.Offset()))
return
return err
}
h.currentSession.MarkOffset(h.topic, message.Partition(), ackLevel+1, "")
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
Expand Down Expand Up @@ -265,16 +266,14 @@ func (m *messageImpl) Ack() error {
if m.isFromPreviousSession() {
return nil
}
m.handler.completeMessage(m, true)
return nil
return m.handler.completeMessage(m, true)
}

func (m *messageImpl) Nack() error {
if m.isFromPreviousSession() {
return nil
}
m.handler.completeMessage(m, false)
return nil
return m.handler.completeMessage(m, false)
}

func (m *messageImpl) isFromPreviousSession() bool {
Expand Down
166 changes: 160 additions & 6 deletions common/messaging/kafka/consumer_impl_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,53 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package kafka

import (
"context"
"sync"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"testing"
)

func TestNewConsumer(t *testing.T) {
mockProducer := mocks.NewSyncProducer(t, nil)
group := "tests"
mockBroker := initMockBroker(t, group)
defer mockBroker.Close()
brokerAddr := []string{mockBroker.Addr()}
kafkaConfig := &config.KafkaConfig{
Clusters: map[string]config.ClusterConfig{
"test-cluster": {
Brokers: []string{"test-brokers"},
Brokers: brokerAddr,
},
},
Topics: map[string]config.TopicConfig{
Expand All @@ -39,11 +70,10 @@ func TestNewConsumer(t *testing.T) {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
logger := testlogger.New(t)
kafkaProducer := NewKafkaProducer(topic, mockProducer, logger)
_, err := newKafkaConsumer(kafkaProducer, kafkaConfig, topic, consumerName,
consumer, err := newKafkaConsumer(kafkaProducer, kafkaConfig, topic, consumerName,
nil, metricsClient, logger)
// test will fail at the sarama.NewConsumerGroup which requires to connect to actual broker
// actual functionality will be tested in the integration test
assert.EqualError(t, err, "kafka: client has run out of available brokers to talk to: dial tcp: address test-brokers: missing port in address")
assert.NoError(t, err, "An error was not expected but got %v", err)
assert.NotNil(t, consumer, "Expected consumer but got nil")
}

func TestNewConsumerHandlerImpl(t *testing.T) {
Expand All @@ -61,3 +91,127 @@ func TestNewConsumerHandlerImpl(t *testing.T) {
// Close the channel at the end of the test
close(msgChan)
}

// test multiple methods related to messageImpl since setup is repeated
func TestMessageImpl(t *testing.T) {
topic := "test-topic"
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
logger := testlogger.New(t)
mockProducer := mocks.NewSyncProducer(t, nil)
kafkaProducer := NewKafkaProducer(topic, mockProducer, logger)
msgChan := make(chan messaging.Message, 1)
consumerHandler := newConsumerHandlerImpl(kafkaProducer, topic, msgChan, metricsClient, logger)
consumerGroupSession := NewMockConsumerGroupSession(int32(1))
partition := int32(100)
offset := int64(0)
msgImpl := &messageImpl{
saramaMsg: &sarama.ConsumerMessage{
Topic: topic,
Partition: partition,
Offset: offset,
},
session: consumerGroupSession,
handler: consumerHandler,
logger: logger,
}

// Ack message that is from a previous session
msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(2)))
err := msgImpl.Ack()
assert.NoError(t, err)

// normal case
msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(1)))
msgImpl.handler.manager.AddMessage(partition, offset)
err = msgImpl.Ack()
assert.NoError(t, err)

// Nack message that is from a previous session
msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(2)))
err = msgImpl.Nack()
assert.NoError(t, err)

// normal case
msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(1)))
mockProducer.ExpectSendMessageAndSucceed()
msgImpl.handler.manager.AddMessage(partition, offset)
err = msgImpl.Nack()
assert.NoError(t, err)

close(msgChan)
}

// MockConsumerGroupSession implements sarama.ConsumerGroupSession for testing purposes.
type MockConsumerGroupSession struct {
claims map[string][]int32
memberID string
generationID int32
offsets map[string]map[int32]int64 // You can store offsets here to verify MarkOffset calls.
commitCalled bool
mu sync.Mutex
context context.Context
}

func NewMockConsumerGroupSession(generationID int32) *MockConsumerGroupSession {
return &MockConsumerGroupSession{
claims: map[string][]int32{},
memberID: "test-member",
generationID: generationID,
offsets: map[string]map[int32]int64{},
commitCalled: false,
}
}

func (m *MockConsumerGroupSession) Claims() map[string][]int32 {
return m.claims
}

func (m *MockConsumerGroupSession) MemberID() string {
return m.memberID
}

func (m *MockConsumerGroupSession) GenerationID() int32 {
return m.generationID
}

func (m *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
m.mu.Lock()
defer m.mu.Unlock()

if m.offsets[topic] == nil {
m.offsets[topic] = make(map[int32]int64)
}
m.offsets[topic][partition] = offset
}

func (m *MockConsumerGroupSession) Commit() {
m.mu.Lock()
defer m.mu.Unlock()
m.commitCalled = true
}

func (m *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
// not needed for testing
}

func (m *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {
// not needed for testing
}

func (m *MockConsumerGroupSession) Context() context.Context {
// Return a context, you can use context.Background() or a custom context if needed
return m.context
}

func initMockBroker(t *testing.T, group string) *sarama.MockBroker {
topics := []string{"test-topic"}
mockBroker := sarama.NewMockBroker(t, 0)

mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(mockBroker.Addr(), mockBroker.BrokerID()).
SetLeader(topics[0], 0, mockBroker.BrokerID()).
SetController(mockBroker.BrokerID()),
})
return mockBroker
}

0 comments on commit be69aea

Please sign in to comment.